This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7516f4a  In HelixBrokerStarter, allow custom cluster change handlers 
(#4089)
7516f4a is described below

commit 7516f4a7f14525fcb000807914bacb010aff0c1a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Apr 8 18:25:26 2019 -0700

    In HelixBrokerStarter, allow custom cluster change handlers (#4089)
    
    Motivation: all cluster change listeners should be added through the 
ClusterChangeMediater in order to perform dedup and proactive check
    
    Add ClusterChangeHandler interface
    Add methods in HelixBrokerStarter to allow custom cluster change handlers 
to be plugged in
    Split the responsibility of helix managers on broker side
    - Spectator: handles the custom change listeners, properties read/write
    - Participant: handles Helix functionality such as state transitions and 
messages
---
 ...okerResourceOnlineOfflineStateModelFactory.java |  30 ++---
 .../broker/broker/helix/ClusterChangeHandler.java  |   8 +-
 .../broker/broker/helix/ClusterChangeMediator.java |  33 ++++--
 .../broker/helix/ExternalViewChangeHandler.java    |  43 -------
 .../broker/broker/helix/HelixBrokerStarter.java    | 129 ++++++++++++++-------
 .../broker/helix/InstanceConfigChangeHandler.java  |  38 ------
 .../broker/helix/LiveInstanceChangeHandler.java    |   7 +-
 .../broker/queryquota/TableQueryQuotaManager.java  |  12 +-
 .../routing/HelixExternalViewBasedRouting.java     |  16 ++-
 9 files changed, 157 insertions(+), 159 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index 1916d86..1004f5a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -18,14 +18,9 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
-import java.util.List;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -37,7 +32,6 @@ import 
org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,19 +47,16 @@ import static 
org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC
 public class BrokerResourceOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerResourceOnlineOfflineStateModelFactory.class);
 
-  private final HelixManager _helixManager;
-  private final HelixAdmin _helixAdmin;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final HelixDataAccessor _helixDataAccessor;
   private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
   private final TableQueryQuotaManager _tableQueryQuotaManager;
 
-  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
-
-  public BrokerResourceOnlineOfflineStateModelFactory(HelixManager 
helixManager,
-      ZkHelixPropertyStore<ZNRecord> propertyStore, 
HelixExternalViewBasedRouting helixExternalViewBasedRouting,
+  public 
BrokerResourceOnlineOfflineStateModelFactory(ZkHelixPropertyStore<ZNRecord> 
propertyStore,
+      HelixDataAccessor helixDataAccessor, HelixExternalViewBasedRouting 
helixExternalViewBasedRouting,
       TableQueryQuotaManager tableQueryQuotaManager) {
-    _helixManager = helixManager;
+    _helixDataAccessor = helixDataAccessor;
     _propertyStore = propertyStore;
-    _helixAdmin = helixManager.getClusterManagmentTool();
     _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
     _tableQueryQuotaManager = tableQueryQuotaManager;
   }
@@ -86,17 +77,14 @@ public class BrokerResourceOnlineOfflineStateModelFactory 
extends StateModelFact
     public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
       try {
         
LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeOnlineFromOffline() 
: " + message);
-        Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
         String tableName = message.getPartitionName();
-        HelixDataAccessor helixDataAccessor = 
_helixManager.getHelixDataAccessor();
-        List<InstanceConfig> instanceConfigList = 
helixDataAccessor.getChildValues(keyBuilder.instanceConfigs());
         TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableName);
 
         _helixExternalViewBasedRouting.markDataResourceOnline(tableConfig,
-            HelixHelper.getExternalViewForResource(_helixAdmin, 
_helixManager.getClusterName(), tableName),
-            instanceConfigList);
-        _tableQueryQuotaManager.initTableQueryQuota(tableConfig, HelixHelper
-            .getExternalViewForResource(_helixAdmin, 
_helixManager.getClusterName(), BROKER_RESOURCE_INSTANCE));
+            
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableName)),
+            
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs()));
+        _tableQueryQuotaManager.initTableQueryQuota(tableConfig,
+            
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
       } catch (Exception e) {
         LOGGER.error("Caught exception during OFFLINE -> ONLINE transition", 
e);
         Utils.rethrowException(e);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
index e0a3e6c..d4b82da 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
@@ -18,10 +18,16 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
+import org.apache.helix.HelixConstants;
+
+
 /**
  * Handles cluster changes such as external view changes, instance config 
changes, live instance changes etc.
  */
 public interface ClusterChangeHandler {
 
-  void processClusterChange();
+  /**
+   * Processes the cluster change of the given type (e.g. EXTERNAL_VIEW, 
INSTANCE_CONFIG, LIVE_INSTANCE).
+   */
+  void processClusterChange(HelixConstants.ChangeType changeType);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 0653ddc..25d5ff2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -56,7 +56,7 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
   // If no change got for 1 hour, proactively check changes
   private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
 
-  private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
+  private final Map<ChangeType, List<ClusterChangeHandler>> _changeHandlersMap;
   private final Map<ChangeType, Long> _lastChangeTimeMap = new HashMap<>();
   private final Map<ChangeType, Long> _lastProcessTimeMap = new HashMap<>();
 
@@ -64,12 +64,13 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
 
   private volatile boolean _stopped = false;
 
-  public ClusterChangeMediator(Map<ChangeType, ClusterChangeHandler> 
changeHandlerMap, BrokerMetrics brokerMetrics) {
-    _changeHandlerMap = changeHandlerMap;
+  public ClusterChangeMediator(Map<ChangeType, List<ClusterChangeHandler>> 
changeHandlersMap,
+      BrokerMetrics brokerMetrics) {
+    _changeHandlersMap = changeHandlersMap;
 
     // Initialize last process time map
     long initTime = System.currentTimeMillis();
-    for (ChangeType changeType : changeHandlerMap.keySet()) {
+    for (ChangeType changeType : changeHandlersMap.keySet()) {
       _lastProcessTimeMap.put(changeType, initTime);
     }
 
@@ -78,9 +79,9 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
       public void run() {
         while (!_stopped) {
           try {
-            for (Map.Entry<ChangeType, ClusterChangeHandler> entry : 
_changeHandlerMap.entrySet()) {
+            for (Map.Entry<ChangeType, List<ClusterChangeHandler>> entry : 
_changeHandlersMap.entrySet()) {
               ChangeType changeType = entry.getKey();
-              ClusterChangeHandler changeHandler = entry.getValue();
+              List<ClusterChangeHandler> changeHandlers = entry.getValue();
               long currentTime = System.currentTimeMillis();
               Long lastChangeTime;
               synchronized (_lastChangeTimeMap) {
@@ -89,13 +90,13 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
               if (lastChangeTime != null) {
                 
brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime 
- lastChangeTime,
                     TimeUnit.MILLISECONDS);
-                processClusterChange(changeType, changeHandler);
+                processClusterChange(changeType, changeHandlers);
               } else {
                 long lastProcessTime = _lastProcessTimeMap.get(changeType);
                 if (currentTime - lastProcessTime > 
PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
                   LOGGER.info("Proactive check {} change", changeType);
                   
brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 
1L);
-                  processClusterChange(changeType, changeHandler);
+                  processClusterChange(changeType, changeHandlers);
                 }
               }
             }
@@ -116,10 +117,20 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
     };
   }
 
-  private void processClusterChange(ChangeType changeType, 
ClusterChangeHandler changeHandler) {
+  private void processClusterChange(ChangeType changeType, 
List<ClusterChangeHandler> changeHandlers) {
     long startTime = System.currentTimeMillis();
     LOGGER.info("Start processing {} change", changeType);
-    changeHandler.processClusterChange();
+    for (ClusterChangeHandler changeHandler : changeHandlers) {
+      try {
+        long handlerStartTime = System.currentTimeMillis();
+        changeHandler.processClusterChange(changeType);
+        LOGGER.info("Finish handling {} change for handler: {} in {}ms", 
changeType, changeHandler.getClass().getName(),
+            System.currentTimeMillis() - handlerStartTime);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while handling {} change for handler: 
{}", changeType,
+            changeHandler.getClass().getName(), e);
+      }
+    }
     long endTime = System.currentTimeMillis();
     LOGGER.info("Finish processing {} change in {}ms", changeType, endTime - 
startTime);
     _lastProcessTimeMap.put(changeType, endTime);
@@ -191,7 +202,7 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
       }
     } else {
       LOGGER.error("Cluster change handling thread is not alive, directly 
process the {} change", changeType);
-      processClusterChange(changeType, _changeHandlerMap.get(changeType));
+      processClusterChange(changeType, _changeHandlersMap.get(changeType));
     }
   }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java
deleted file mode 100644
index 1422953..0000000
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ExternalViewChangeHandler.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker.helix;
-
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
-import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
-
-
-/**
- * Cluster change handler for external view changes.
- */
-public class ExternalViewChangeHandler implements ClusterChangeHandler {
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
-
-  public ExternalViewChangeHandler(HelixExternalViewBasedRouting 
helixExternalViewBasedRouting,
-      TableQueryQuotaManager tableQueryQuotaManager) {
-    _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
-  }
-
-  @Override
-  public void processClusterChange() {
-    _helixExternalViewBasedRouting.processExternalViewChange();
-    _tableQueryQuotaManager.processQueryQuotaChange();
-  }
-}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index ce76f22..337a6c3 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -21,19 +21,19 @@ package org.apache.pinot.broker.broker.helix;
 import com.google.common.collect.ImmutableList;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
@@ -61,16 +61,19 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class HelixBrokerStarter {
-
   private static final String PROPERTY_STORE = "PROPERTYSTORE";
 
+  // Spectator Helix manager handles the custom change listeners, properties 
read/write
   private final HelixManager _spectatorHelixManager;
-  private final HelixManager _helixManager;
-  private final HelixAdmin _helixAdmin;
+  // Participant Helix manager handles Helix functionality such as state 
transitions and messages
+  private final HelixManager _participantHelixManager;
+
   private final Configuration _pinotHelixProperties;
+  private final HelixAdmin _helixAdmin;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final HelixDataAccessor _helixDataAccessor;
   private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
   private final BrokerServerBuilder _brokerServerBuilder;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
   private final MetricsRegistry _metricsRegistry;
   private final TableQueryQuotaManager _tableQueryQuotaManager;
@@ -117,66 +120,68 @@ public class HelixBrokerStarter {
     _spectatorHelixManager.connect();
     _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
+    _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
     _helixExternalViewBasedRouting = new 
HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
         pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
     _tableQueryQuotaManager = new 
TableQueryQuotaManager(_spectatorHelixManager);
     _liveInstanceChangeHandler = new 
LiveInstanceChangeHandler(_spectatorHelixManager);
-    Map<ChangeType, ClusterChangeHandler> clusterChangeHandlerMap = new 
HashMap<>();
-    clusterChangeHandlerMap.put(ChangeType.EXTERNAL_VIEW,
-        new ExternalViewChangeHandler(_helixExternalViewBasedRouting, 
_tableQueryQuotaManager));
-    clusterChangeHandlerMap
-        .put(ChangeType.INSTANCE_CONFIG, new 
InstanceConfigChangeHandler(_helixExternalViewBasedRouting));
-    clusterChangeHandlerMap.put(ChangeType.LIVE_INSTANCE, 
_liveInstanceChangeHandler);
     _brokerServerBuilder = startBroker(_pinotHelixProperties);
     _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
-    _clusterChangeMediator =
-        new ClusterChangeMediator(clusterChangeHandlerMap, 
_brokerServerBuilder.getBrokerMetrics());
+
+    // Initialize cluster change mediator
+    Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap = new 
HashMap<>();
+    List<ClusterChangeHandler> externalViewChangeHandlers = new ArrayList<>();
+    externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
+    externalViewChangeHandlers.add(_tableQueryQuotaManager);
+    
externalViewChangeHandlers.addAll(getCustomExternalViewChangeHandlers(_spectatorHelixManager));
+    changeHandlersMap.put(ChangeType.EXTERNAL_VIEW, 
externalViewChangeHandlers);
+    List<ClusterChangeHandler> instanceConfigChangeHandlers = new 
ArrayList<>();
+    instanceConfigChangeHandlers.add(_helixExternalViewBasedRouting);
+    
instanceConfigChangeHandlers.addAll(getCustomInstanceConfigChangeHandlers(_spectatorHelixManager));
+    changeHandlersMap.put(ChangeType.INSTANCE_CONFIG, 
instanceConfigChangeHandlers);
+    List<ClusterChangeHandler> liveInstanceChangeHandler = new ArrayList<>();
+    liveInstanceChangeHandler.add(_liveInstanceChangeHandler);
+    
liveInstanceChangeHandler.addAll(getCustomLiveInstanceChangeHandlers(_spectatorHelixManager));
+    changeHandlersMap.put(ChangeType.LIVE_INSTANCE, liveInstanceChangeHandler);
+    _clusterChangeMediator = new ClusterChangeMediator(changeHandlersMap, 
_brokerServerBuilder.getBrokerMetrics());
     _clusterChangeMediator.start();
     
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
     
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
     
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
 
     // Connect participant Helix manager.
-    _helixManager =
+    _participantHelixManager =
         HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, 
InstanceType.PARTICIPANT, zkServers);
-    StateMachineEngine stateMachineEngine = 
_helixManager.getStateMachineEngine();
+    StateMachineEngine stateMachineEngine = 
_participantHelixManager.getStateMachineEngine();
     StateModelFactory<?> stateModelFactory =
-        new 
BrokerResourceOnlineOfflineStateModelFactory(_spectatorHelixManager, 
_propertyStore,
+        new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, 
_helixDataAccessor,
             _helixExternalViewBasedRouting, _tableQueryQuotaManager);
     stateMachineEngine
         
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
 stateModelFactory);
-    _helixManager.connect();
+    _participantHelixManager.connect();
     _tbiMessageHandler = new 
TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting,
         
_pinotHelixProperties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
             
CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS));
-    _helixManager.getMessagingService()
+    _participantHelixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
_tbiMessageHandler);
 
     addInstanceTagIfNeeded(helixClusterName, brokerId);
-    final double minResourcePercentForStartup = 
_pinotHelixProperties.getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
-        CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
 
     // Register the service status handler
+    double minResourcePercentForStartup = _pinotHelixProperties
+        
.getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
+            
CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
     ServiceStatus.setServiceStatusCallback(new 
ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
-        .of(new 
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager,
 helixClusterName,
-                brokerId, minResourcePercentForStartup),
-            new 
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager,
 helixClusterName,
-                brokerId, minResourcePercentForStartup))));
+        .of(new 
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_participantHelixManager,
+                helixClusterName, brokerId, minResourcePercentForStartup),
+            new 
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager,
+                helixClusterName, brokerId, minResourcePercentForStartup))));
 
-    
_brokerServerBuilder.getBrokerMetrics().addCallbackGauge("helix.connected", new 
Callable<Long>() {
-      @Override
-      public Long call()
-          throws Exception {
-        return _helixManager.isConnected() ? 1L : 0L;
-      }
-    });
+    _brokerServerBuilder.getBrokerMetrics()
+        .addCallbackGauge("helix.connected", () -> 
_participantHelixManager.isConnected() ? 1L : 0L);
 
-    _helixManager.addPreConnectCallback(new PreConnectCallback() {
-      @Override
-      public void onPreConnect() {
-        
_brokerServerBuilder.getBrokerMetrics().addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS,
 1L);
-      }
-    });
+    _participantHelixManager.addPreConnectCallback(() -> 
_brokerServerBuilder.getBrokerMetrics()
+        .addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
   }
 
   private void setupHelixSystemProperties() {
@@ -187,7 +192,8 @@ public class HelixBrokerStarter {
   }
 
   private void addInstanceTagIfNeeded(String clusterName, String instanceName) 
{
-    InstanceConfig instanceConfig = _helixAdmin.getInstanceConfig(clusterName, 
instanceName);
+    InstanceConfig instanceConfig =
+        
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(instanceName));
     List<String> instanceTags = instanceConfig.getTags();
     if (instanceTags == null || instanceTags.isEmpty()) {
       if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) 
{
@@ -226,6 +232,45 @@ public class HelixBrokerStarter {
     return brokerServerBuilder;
   }
 
+  /**
+   * To be overridden to plug in custom external view change handlers.
+   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
+   * handlers from running. For slow change handler, make it asynchronous.
+   *
+   * @param spectatorHelixManager Spectator Helix manager
+   * @return List of custom external view change handlers to plug in
+   */
+  @SuppressWarnings("unused")
+  protected List<ClusterChangeHandler> 
getCustomExternalViewChangeHandlers(HelixManager spectatorHelixManager) {
+    return Collections.emptyList();
+  }
+
+  /**
+   * To be overridden to plug in custom instance config change handlers.
+   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
+   * handlers from running. For slow change handler, make it asynchronous.
+   *
+   * @param spectatorHelixManager Spectator Helix manager
+   * @return List of custom instance config change handlers to plug in
+   */
+  @SuppressWarnings("unused")
+  protected List<ClusterChangeHandler> 
getCustomInstanceConfigChangeHandlers(HelixManager spectatorHelixManager) {
+    return Collections.emptyList();
+  }
+
+  /**
+   * To be overridden to plug in custom live instance change handlers.
+   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
+   * handlers from running. For slow change handler, make it asynchronous.
+   *
+   * @param spectatorHelixManager Spectator Helix manager
+   * @return List of custom live instance change handlers to plug in
+   */
+  @SuppressWarnings("unused")
+  protected List<ClusterChangeHandler> 
getCustomLiveInstanceChangeHandlers(HelixManager spectatorHelixManager) {
+    return Collections.emptyList();
+  }
+
   public AccessControlFactory getAccessControlFactory() {
     return _accessControlFactory;
   }
@@ -283,9 +328,9 @@ public class HelixBrokerStarter {
   public void shutdown() {
     LOGGER.info("Shutting down");
 
-    if (_helixManager != null) {
-      LOGGER.info("Disconnecting Helix manager");
-      _helixManager.disconnect();
+    if (_participantHelixManager != null) {
+      LOGGER.info("Disconnecting participant Helix manager");
+      _participantHelixManager.disconnect();
     }
 
     if (_spectatorHelixManager != null) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
deleted file mode 100644
index bfba1af..0000000
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/InstanceConfigChangeHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker.helix;
-
-import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
-
-
-/**
- * Cluster change handler for instance config changes.
- */
-public class InstanceConfigChangeHandler implements ClusterChangeHandler {
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-
-  public InstanceConfigChangeHandler(HelixExternalViewBasedRouting 
helixExternalViewBasedRouting) {
-    _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
-  }
-
-  @Override
-  public void processClusterChange() {
-    _helixExternalViewBasedRouting.processInstanceConfigChange();
-  }
-}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
index aaecb47..6bd1229 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
@@ -58,7 +60,10 @@ public class LiveInstanceChangeHandler implements 
ClusterChangeHandler {
   }
 
   @Override
-  public void processClusterChange() {
+  public void processClusterChange(HelixConstants.ChangeType changeType) {
+    Preconditions
+        .checkState(changeType == HelixConstants.ChangeType.LIVE_INSTANCE, 
"Illegal change type: " + changeType);
+
     // Skip processing live instance change for single-connection routing
     if (_connectionPool == null) {
       return;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
index d37670e..13f24b8 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
@@ -19,15 +19,18 @@
 package org.apache.pinot.broker.queryquota;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.RateLimiter;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
 import org.apache.pinot.common.config.QuotaConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -43,7 +46,7 @@ import static 
org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC
 import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 
 
-public class TableQueryQuotaManager {
+public class TableQueryQuotaManager implements ClusterChangeHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableQueryQuotaManager.class);
 
   private BrokerMetrics _brokerMetrics;
@@ -327,4 +330,11 @@ public class TableQueryQuotaManager {
         .info("Processed query quota change in {}ms, {} out of {} query quota 
configs rebuilt.", (endTime - startTime),
             numRebuilt, _rateLimiterMap.size());
   }
+
+  @Override
+  public void processClusterChange(HelixConstants.ChangeType changeType) {
+    Preconditions
+        .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW, 
"Illegal change type: " + changeType);
+    processQueryQuotaChange();
+  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
index e62a7ef..51a9556 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.routing;
 
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -32,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.AccessOption;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
@@ -39,6 +41,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
 import org.apache.pinot.broker.routing.builder.RoutingTableBuilder;
 import org.apache.pinot.broker.routing.selector.SegmentSelector;
 import org.apache.pinot.broker.routing.selector.SegmentSelectorProvider;
@@ -64,7 +67,7 @@ import org.slf4j.LoggerFactory;
  * of an offline routing table and a realtime routing table, with the realtime 
routing table being aware of the
  * fact that there is both an hlc and llc one.
  */
-public class HelixExternalViewBasedRouting implements RoutingTable {
+public class HelixExternalViewBasedRouting implements ClusterChangeHandler, 
RoutingTable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixExternalViewBasedRouting.class);
 
   private final Map<String, RoutingTableBuilder> _routingTableBuilderMap;
@@ -598,4 +601,15 @@ public class HelixExternalViewBasedRouting implements 
RoutingTable {
 
     return JsonUtils.objectToPrettyString(ret);
   }
+
+  @Override
+  public void processClusterChange(HelixConstants.ChangeType changeType) {
+    Preconditions.checkState(changeType == 
HelixConstants.ChangeType.EXTERNAL_VIEW
+        || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal 
change type: " + changeType);
+    if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
+      processExternalViewChange();
+    } else {
+      processInstanceConfigChange();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to