This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0839fb1  Refactor HelixBrokerStarter to separate constructor and 
start() (#4100)
0839fb1 is described below

commit 0839fb1ee565abd7e6058e1e0fc7372cedd2fa0b
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Apr 17 13:11:06 2019 -0700

    Refactor HelixBrokerStarter to separate constructor and start() (#4100)
    
    For the starter class, constructor should only set the properties,
    but not start the service, which should be done by start() instead.
    
    - Refactor HelixBrokerStarter to separate constructor and start()
    - Modify shutdown() to shut done all components properly
    - Change the APIs to plug in custom change handlers without
      extending the class
    
    NOTE: THIS IS A BACKWARD-INCOMPATIBLE CHANGE
    - Need to call start() separately in order to start the broker
    - Order of arguments changed in constructor (move nullable to the
      last. This is intentional so that user can find the incompatible
      easier during compilation, thus add the start())
---
 .../pinot/broker/broker/BrokerServerBuilder.java   |   7 +-
 .../broker/broker/helix/ClusterChangeHandler.java  |  10 +
 .../broker/broker/helix/HelixBrokerStarter.java    | 339 ++++++++++-----------
 .../broker/helix/LiveInstanceChangeHandler.java    |  10 +-
 .../broker/queryquota/TableQueryQuotaManager.java  |  29 +-
 .../routing/HelixExternalViewBasedRouting.java     |  49 +--
 .../broker/broker/HelixBrokerStarterTest.java      |   3 +-
 .../queryquota/TableQueryQuotaManagerTest.java     |   3 +-
 .../broker/routing/RandomRoutingTableTest.java     |   5 +-
 .../pinot/broker/routing/RoutingTableTest.java     |  36 ++-
 .../apache/pinot/common/utils/CommonConstants.java |   2 +
 .../apache/pinot/controller/ControllerStarter.java |   3 +-
 .../pinot/integration/tests/ClusterTest.java       |   7 +-
 .../server/starter/helix/HelixServerStarter.java   |   3 +-
 .../tools/admin/command/StartBrokerCommand.java    |  10 +-
 .../pinot/tools/perf/PerfBenchmarkDriver.java      |   2 +-
 16 files changed, 276 insertions(+), 242 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index 387e214..a63e954 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -109,11 +109,12 @@ public class BrokerServerBuilder {
     _state.set(State.STARTING);
 
     _brokerRequestHandler.start();
-    _brokerAdminApplication.start(_config
-        .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
+    int brokerQueryPort =
+        _config.getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT);
+    _brokerAdminApplication.start(brokerQueryPort);
 
     _state.set(State.RUNNING);
-    LOGGER.info("Pinot Broker started");
+    LOGGER.info("Pinot Broker is started and listening on port {} for API 
requests", brokerQueryPort);
   }
 
   public void stop() {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
index d4b82da..9d335a6 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
@@ -19,14 +19,24 @@
 package org.apache.pinot.broker.broker.helix;
 
 import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.annotations.InterfaceAudience;
+import org.apache.pinot.annotations.InterfaceStability;
 
 
 /**
  * Handles cluster changes such as external view changes, instance config 
changes, live instance changes etc.
  */
[email protected]
[email protected]
 public interface ClusterChangeHandler {
 
   /**
+   * Initializes the cluster change handler with the given connected Helix 
manager.
+   */
+  void init(HelixManager helixManager);
+
+  /**
    * Processes the cluster change of the given type (e.g. EXTERNAL_VIEW, 
INSTANCE_CONFIG, LIVE_INSTANCE).
    */
   void processClusterChange(HelixConstants.ChangeType changeType);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 7bc9c8d..d684d09 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.broker.helix;
 import com.google.common.collect.ImmutableList;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +47,7 @@ import 
org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
@@ -55,100 +55,161 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-/**
- * Helix Broker Startable
- *
- *
- */
+@SuppressWarnings("unused")
 public class HelixBrokerStarter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixBrokerStarter.class);
   private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = 
"pinot.broker.routing.table";
 
-  // Spectator Helix manager handles the custom change listeners, properties 
read/write
-  private final HelixManager _spectatorHelixManager;
-  // Participant Helix manager handles Helix functionality such as state 
transitions and messages
-  private final HelixManager _participantHelixManager;
-
   private final Configuration _brokerConf;
-  private final HelixAdmin _helixAdmin;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final HelixDataAccessor _helixDataAccessor;
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final BrokerServerBuilder _brokerServerBuilder;
-  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
-  private final MetricsRegistry _metricsRegistry;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
-  private final ClusterChangeMediator _clusterChangeMediator;
-  private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
-
-  // Set after broker is started, which is actually in the constructor.
+  private final String _clusterName;
+  private final String _zkServers;
+  private final String _brokerId;
+
+  private final List<ClusterChangeHandler> _externalViewChangeHandlers = new 
ArrayList<>();
+  private final List<ClusterChangeHandler> _instanceConfigChangeHandlers = new 
ArrayList<>();
+  private final List<ClusterChangeHandler> _liveInstanceChangeHandlers = new 
ArrayList<>();
+
+  // Spectator Helix manager handles the custom change listeners, properties 
read/write
+  private HelixManager _spectatorHelixManager;
+  private HelixAdmin _helixAdmin;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private HelixDataAccessor _helixDataAccessor;
+
+  // Cluster change handlers
+  private HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
+  private TableQueryQuotaManager _tableQueryQuotaManager;
+  private LiveInstanceChangeHandler _liveInstanceChangeHandler;
+  private ClusterChangeMediator _clusterChangeMediator;
+
+  private BrokerServerBuilder _brokerServerBuilder;
   private AccessControlFactory _accessControlFactory;
+  private MetricsRegistry _metricsRegistry;
+
+  // Participant Helix manager handles Helix functionality such as state 
transitions and messages
+  private HelixManager _participantHelixManager;
+  private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
 
-  public HelixBrokerStarter(String helixClusterName, String zkServer, 
Configuration brokerConf)
+  public HelixBrokerStarter(Configuration brokerConf, String clusterName, 
String zkServer)
       throws Exception {
-    this(null, helixClusterName, zkServer, brokerConf);
+    this(brokerConf, clusterName, zkServer, null);
   }
 
-  public HelixBrokerStarter(@Nullable String brokerHost, String 
helixClusterName, String zkServer,
-      Configuration brokerConf)
+  public HelixBrokerStarter(Configuration brokerConf, String clusterName, 
String zkServer, @Nullable String brokerHost)
       throws Exception {
-    LOGGER.info("Starting Pinot broker");
-
     _brokerConf = brokerConf;
+    setupHelixSystemProperties();
+
+    _clusterName = clusterName;
+
+    // Remove all white-spaces from the list of zkServers (if any).
+    _zkServers = zkServer.replaceAll("\\s+", "");
 
     if (brokerHost == null) {
       brokerHost = NetUtil.getHostAddress();
     }
-
-    String brokerId = 
_brokerConf.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
+    _brokerId = 
_brokerConf.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
         CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + 
_brokerConf
             .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
+    _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, 
_brokerId);
+  }
 
-    _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, 
brokerId);
-    setupHelixSystemProperties();
+  private void setupHelixSystemProperties() {
+    // NOTE: Helix will disconnect the manager and disable the instance if it 
detects flapping (too frequent disconnect
+    // from ZooKeeper). Setting flapping time window to a small value can 
avoid this from happening. Helix ignores the
+    // non-positive value, so set the default value as 1.
+    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _brokerConf
+        
.getString(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS,
+            CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
+  }
 
-    // Remove all white-spaces from the list of zkServers (if any).
-    String zkServers = zkServer.replaceAll("\\s+", "");
+  /**
+   * Adds an external view change handler to handle Helix external view change 
callbacks.
+   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
+   * handlers from running. For slow change handler, make it asynchronous.
+   */
+  public void addExternalViewChangeHandler(ClusterChangeHandler 
externalViewChangeHandler) {
+    _externalViewChangeHandlers.add(externalViewChangeHandler);
+  }
+
+  /**
+   * Adds an instance config change handler to handle Helix instance config 
change callbacks.
+   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
+   * handlers from running. For slow change handler, make it asynchronous.
+   */
+  public void addInstanceConfigChangeHandler(ClusterChangeHandler 
instanceConfigChangeHandler) {
+    _instanceConfigChangeHandlers.add(instanceConfigChangeHandler);
+  }
+
+  /**
+   * Adds a live instance change handler to handle Helix live instance change 
callbacks.
+   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
+   * handlers from running. For slow change handler, make it asynchronous.
+   */
+  public void addLiveInstanceChangeHandler(ClusterChangeHandler 
liveInstanceChangeHandler) {
+    _liveInstanceChangeHandlers.add(liveInstanceChangeHandler);
+  }
 
-    LOGGER.info("Connecting Helix components");
-    // Connect spectator Helix manager.
+  // TODO: refactor this logic into BrokerServerBuilder
+  public void start()
+      throws Exception {
+    LOGGER.info("Starting Pinot broker");
+
+    // Connect the spectator Helix manager
+    LOGGER.info("Connecting spectator Helix manager");
     _spectatorHelixManager =
-        HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, 
InstanceType.SPECTATOR, zkServers);
+        HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId, 
InstanceType.SPECTATOR, _zkServers);
     _spectatorHelixManager.connect();
     _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
-    _helixExternalViewBasedRouting = new 
HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
-        brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
-    _tableQueryQuotaManager = new 
TableQueryQuotaManager(_spectatorHelixManager);
-    _liveInstanceChangeHandler = new 
LiveInstanceChangeHandler(_spectatorHelixManager);
-    _brokerServerBuilder = startBroker(_brokerConf);
+    _helixExternalViewBasedRouting =
+        new 
HelixExternalViewBasedRouting(_brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+    _helixExternalViewBasedRouting.init(_spectatorHelixManager);
+    _tableQueryQuotaManager = new TableQueryQuotaManager();
+    _tableQueryQuotaManager.init(_spectatorHelixManager);
+    _liveInstanceChangeHandler = new LiveInstanceChangeHandler();
+    _liveInstanceChangeHandler.init(_spectatorHelixManager);
+
+    // Set up the broker server builder
+    LOGGER.info("Setting up broker server builder");
+    _brokerServerBuilder = new BrokerServerBuilder(_brokerConf, 
_helixExternalViewBasedRouting,
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), 
_liveInstanceChangeHandler, _tableQueryQuotaManager);
+    _accessControlFactory = _brokerServerBuilder.getAccessControlFactory();
     _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
-
-    // Initialize cluster change mediator
-    Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap = new 
HashMap<>();
-    List<ClusterChangeHandler> externalViewChangeHandlers = new ArrayList<>();
-    externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
-    externalViewChangeHandlers.add(_tableQueryQuotaManager);
-    
externalViewChangeHandlers.addAll(getCustomExternalViewChangeHandlers(_spectatorHelixManager));
-    changeHandlersMap.put(ChangeType.EXTERNAL_VIEW, 
externalViewChangeHandlers);
-    List<ClusterChangeHandler> instanceConfigChangeHandlers = new 
ArrayList<>();
-    instanceConfigChangeHandlers.add(_helixExternalViewBasedRouting);
-    
instanceConfigChangeHandlers.addAll(getCustomInstanceConfigChangeHandlers(_spectatorHelixManager));
-    changeHandlersMap.put(ChangeType.INSTANCE_CONFIG, 
instanceConfigChangeHandlers);
-    List<ClusterChangeHandler> liveInstanceChangeHandler = new ArrayList<>();
-    liveInstanceChangeHandler.add(_liveInstanceChangeHandler);
-    
liveInstanceChangeHandler.addAll(getCustomLiveInstanceChangeHandlers(_spectatorHelixManager));
-    changeHandlersMap.put(ChangeType.LIVE_INSTANCE, liveInstanceChangeHandler);
-    _clusterChangeMediator = new ClusterChangeMediator(changeHandlersMap, 
_brokerServerBuilder.getBrokerMetrics());
+    BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
+    _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
+    _tableQueryQuotaManager.setBrokerMetrics(brokerMetrics);
+    _brokerServerBuilder.start();
+
+    // Initialize the cluster change mediator
+    LOGGER.info("Initializing cluster change mediator");
+    for (ClusterChangeHandler externalViewChangeHandler : 
_externalViewChangeHandlers) {
+      externalViewChangeHandler.init(_spectatorHelixManager);
+    }
+    _externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
+    _externalViewChangeHandlers.add(_tableQueryQuotaManager);
+    for (ClusterChangeHandler instanceConfigChangeHandler : 
_instanceConfigChangeHandlers) {
+      instanceConfigChangeHandler.init(_spectatorHelixManager);
+    }
+    _instanceConfigChangeHandlers.add(_helixExternalViewBasedRouting);
+    for (ClusterChangeHandler liveInstanceChangeHandler : 
_liveInstanceChangeHandlers) {
+      liveInstanceChangeHandler.init(_spectatorHelixManager);
+    }
+    _liveInstanceChangeHandlers.add(_liveInstanceChangeHandler);
+    Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new 
HashMap<>();
+    clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW, 
_externalViewChangeHandlers);
+    clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG, 
_instanceConfigChangeHandlers);
+    clusterChangeHandlersMap.put(ChangeType.LIVE_INSTANCE, 
_liveInstanceChangeHandlers);
+    _clusterChangeMediator = new 
ClusterChangeMediator(clusterChangeHandlersMap, brokerMetrics);
     _clusterChangeMediator.start();
     
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
     
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
     
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
 
-    // Connect participant Helix manager.
+    // Connect the participant Helix manager
+    LOGGER.info("Connecting participant Helix manager");
     _participantHelixManager =
-        HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, 
InstanceType.PARTICIPANT, zkServers);
+        HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId, 
InstanceType.PARTICIPANT, _zkServers);
     StateMachineEngine stateMachineEngine = 
_participantHelixManager.getStateMachineEngine();
     StateModelFactory<?> stateModelFactory =
         new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, 
_helixDataAccessor,
@@ -161,110 +222,69 @@ public class HelixBrokerStarter {
             
CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS));
     _participantHelixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
_tbiMessageHandler);
-
-    addInstanceTagIfNeeded(helixClusterName, brokerId);
+    addInstanceTagIfNeeded();
+    
brokerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
+        () -> _participantHelixManager.isConnected() ? 1L : 0L);
+    _participantHelixManager
+        .addPreConnectCallback(() -> 
brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 
1L));
 
     // Register the service status handler
+    LOGGER.info("Registering service status handler");
     double minResourcePercentForStartup = _brokerConf
         
.getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
             
CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
     ServiceStatus.setServiceStatusCallback(new 
ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
         .of(new 
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_participantHelixManager,
-                helixClusterName, brokerId, minResourcePercentForStartup),
+                _clusterName, _brokerId, minResourcePercentForStartup),
             new 
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager,
-                helixClusterName, brokerId, minResourcePercentForStartup))));
+                _clusterName, _brokerId, minResourcePercentForStartup))));
 
-    _brokerServerBuilder.getBrokerMetrics()
-        .addCallbackGauge("helix.connected", () -> 
_participantHelixManager.isConnected() ? 1L : 0L);
-
-    _participantHelixManager.addPreConnectCallback(() -> 
_brokerServerBuilder.getBrokerMetrics()
-        .addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
-  }
-
-  private void setupHelixSystemProperties() {
-    // NOTE: Helix will disconnect the manager and disable the instance if it 
detects flapping (too frequent disconnect
-    // from ZooKeeper). Setting flapping time window to a small value can 
avoid this from happening. Helix ignores the
-    // non-positive value, so set the default value as 1.
-    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _brokerConf
-        
.getString(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS,
-            CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
+    LOGGER.info("Finish starting Pinot broker");
   }
 
-  private void addInstanceTagIfNeeded(String clusterName, String instanceName) 
{
+  private void addInstanceTagIfNeeded() {
     InstanceConfig instanceConfig =
-        
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(instanceName));
+        
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(_brokerId));
     List<String> instanceTags = instanceConfig.getTags();
     if (instanceTags == null || instanceTags.isEmpty()) {
       if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) 
{
-        _helixAdmin.addInstanceTag(clusterName, instanceName,
+        _helixAdmin.addInstanceTag(_clusterName, _brokerId,
             
TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
       } else {
-        _helixAdmin.addInstanceTag(clusterName, instanceName, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+        _helixAdmin.addInstanceTag(_clusterName, _brokerId, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
       }
     }
   }
 
-  private BrokerServerBuilder startBroker(Configuration config) {
-    BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, 
_helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), 
_liveInstanceChangeHandler, _tableQueryQuotaManager);
-    _accessControlFactory = brokerServerBuilder.getAccessControlFactory();
-    
_helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
-    
_tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
-    brokerServerBuilder.start();
-
-    LOGGER.info("Pinot broker ready and listening on port {} for API requests",
-        config.getProperty("pinot.broker.client.queryPort"));
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        try {
-          brokerServerBuilder.stop();
-        } catch (final Exception e) {
-          LOGGER.error("Caught exception while running shutdown hook", e);
-        }
-      }
-    });
-    return brokerServerBuilder;
-  }
+  public void shutdown() {
+    LOGGER.info("Shutting down Pinot broker");
 
-  /**
-   * To be overridden to plug in custom external view change handlers.
-   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
-   * handlers from running. For slow change handler, make it asynchronous.
-   *
-   * @param spectatorHelixManager Spectator Helix manager
-   * @return List of custom external view change handlers to plug in
-   */
-  @SuppressWarnings("unused")
-  protected List<ClusterChangeHandler> 
getCustomExternalViewChangeHandlers(HelixManager spectatorHelixManager) {
-    return Collections.emptyList();
-  }
+    if (_tbiMessageHandler != null) {
+      LOGGER.info("Shutting down time boundary info refresh message handler");
+      _tbiMessageHandler.shutdown();
+    }
 
-  /**
-   * To be overridden to plug in custom instance config change handlers.
-   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
-   * handlers from running. For slow change handler, make it asynchronous.
-   *
-   * @param spectatorHelixManager Spectator Helix manager
-   * @return List of custom instance config change handlers to plug in
-   */
-  @SuppressWarnings("unused")
-  protected List<ClusterChangeHandler> 
getCustomInstanceConfigChangeHandlers(HelixManager spectatorHelixManager) {
-    return Collections.emptyList();
-  }
+    if (_participantHelixManager != null) {
+      LOGGER.info("Disconnecting participant Helix manager");
+      _participantHelixManager.disconnect();
+    }
 
-  /**
-   * To be overridden to plug in custom live instance change handlers.
-   * <p>NOTE: all change handlers will be run in a single thread, so any slow 
change handler can block other change
-   * handlers from running. For slow change handler, make it asynchronous.
-   *
-   * @param spectatorHelixManager Spectator Helix manager
-   * @return List of custom live instance change handlers to plug in
-   */
-  @SuppressWarnings("unused")
-  protected List<ClusterChangeHandler> 
getCustomLiveInstanceChangeHandlers(HelixManager spectatorHelixManager) {
-    return Collections.emptyList();
+    if (_clusterChangeMediator != null) {
+      LOGGER.info("Stopping cluster change mediator");
+      _clusterChangeMediator.stop();
+    }
+
+    if (_brokerServerBuilder != null) {
+      LOGGER.info("Stopping broker server builder");
+      _brokerServerBuilder.stop();
+    }
+
+    if (_spectatorHelixManager != null) {
+      LOGGER.info("Disconnecting spectator Helix manager");
+      _spectatorHelixManager.disconnect();
+    }
+
+    LOGGER.info("Finish shutting down Pinot broker");
   }
 
   public AccessControlFactory getAccessControlFactory() {
@@ -283,42 +303,21 @@ public class HelixBrokerStarter {
     return _brokerServerBuilder;
   }
 
-  public static HelixBrokerStarter startDefault()
+  public MetricsRegistry getMetricsRegistry() {
+    return _metricsRegistry;
+  }
+
+  public static HelixBrokerStarter getDefault()
       throws Exception {
     Configuration brokerConf = new BaseConfiguration();
     int port = 5001;
     brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
port);
     brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 
60 * 1000L);
-    return new HelixBrokerStarter(null, "quickstart", "localhost:2122", 
brokerConf);
-  }
-
-  public void shutdown() {
-    LOGGER.info("Shutting down");
-
-    if (_participantHelixManager != null) {
-      LOGGER.info("Disconnecting participant Helix manager");
-      _participantHelixManager.disconnect();
-    }
-
-    if (_spectatorHelixManager != null) {
-      LOGGER.info("Disconnecting spectator Helix manager");
-      _spectatorHelixManager.disconnect();
-    }
-
-    if (_tbiMessageHandler != null) {
-      LOGGER.info("Shutting down timeboundary info refresh message handler");
-      _tbiMessageHandler.shutdown();
-    }
-
-    _clusterChangeMediator.stop();
-  }
-
-  public MetricsRegistry getMetricsRegistry() {
-    return _metricsRegistry;
+    return new HelixBrokerStarter(brokerConf, "quickstart", "localhost:2122");
   }
 
   public static void main(String[] args)
       throws Exception {
-    startDefault();
+    getDefault().start();
   }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
index 6bd1229..5162ce4 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/LiveInstanceChangeHandler.java
@@ -43,15 +43,17 @@ public class LiveInstanceChangeHandler implements 
ClusterChangeHandler {
 
   private static final boolean DO_NOT_RECREATE = false;
 
-  private final HelixDataAccessor _helixDataAccessor;
-  private final PropertyKey _liveInstancesKey;
+  private HelixDataAccessor _helixDataAccessor;
+  private PropertyKey _liveInstancesKey;
 
   private KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> 
_connectionPool;
   private Map<String, String> _liveInstanceToSessionIdMap;
 
-  public LiveInstanceChangeHandler(HelixManager helixManager) {
+  @Override
+  public void init(HelixManager helixManager) {
+    Preconditions.checkState(_helixDataAccessor == null, 
"LiveInstanceChangeHandler is already initialized");
     _helixDataAccessor = helixManager.getHelixDataAccessor();
-    _liveInstancesKey = new 
PropertyKey.Builder(helixManager.getClusterName()).liveInstances();
+    _liveInstancesKey = _helixDataAccessor.keyBuilder().liveInstances();
   }
 
   public void 
init(KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> 
connectionPool) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
index 13f24b8..1a685d2 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
@@ -48,17 +48,25 @@ import static 
org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 
 public class TableQueryQuotaManager implements ClusterChangeHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableQueryQuotaManager.class);
+  private static final int TIME_RANGE_IN_SECOND = 1;
+
+  private final AtomicInteger _lastKnownBrokerResourceVersion = new 
AtomicInteger(-1);
+  private final Map<String, QueryQuotaConfig> _rateLimiterMap = new 
ConcurrentHashMap<>();
 
+  private HelixManager _helixManager;
   private BrokerMetrics _brokerMetrics;
-  private final HelixManager _helixManager;
-  private final AtomicInteger _lastKnownBrokerResourceVersion;
-  private final Map<String, QueryQuotaConfig> _rateLimiterMap;
-  private static final int TIME_RANGE_IN_SECOND = 1;
 
-  public TableQueryQuotaManager(HelixManager helixManager) {
+  @Override
+  public void init(HelixManager helixManager) {
+    Preconditions.checkState(_helixManager == null, "TableQueryQuotaManager is 
already initialized");
     _helixManager = helixManager;
-    _rateLimiterMap = new ConcurrentHashMap<>();
-    _lastKnownBrokerResourceVersion = new AtomicInteger();
+  }
+
+  @Override
+  public void processClusterChange(HelixConstants.ChangeType changeType) {
+    Preconditions
+        .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW, 
"Illegal change type: " + changeType);
+    processQueryQuotaChange();
   }
 
   /**
@@ -330,11 +338,4 @@ public class TableQueryQuotaManager implements 
ClusterChangeHandler {
         .info("Processed query quota change in {}ms, {} out of {} query quota 
configs rebuilt.", (endTime - startTime),
             numRebuilt, _rateLimiterMap.size());
   }
-
-  @Override
-  public void processClusterChange(HelixConstants.ChangeType changeType) {
-    Preconditions
-        .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW, 
"Illegal change type: " + changeType);
-    processQueryQuotaChange();
-  }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
index 51a9556..f2e3f08 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedRouting.java
@@ -69,37 +69,49 @@ import org.slf4j.LoggerFactory;
  */
 public class HelixExternalViewBasedRouting implements ClusterChangeHandler, 
RoutingTable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixExternalViewBasedRouting.class);
+  private static final int INVALID_EXTERNAL_VIEW_VERSION = Integer.MIN_VALUE;
 
-  private final Map<String, RoutingTableBuilder> _routingTableBuilderMap;
-
+  private final Map<String, RoutingTableBuilder> _routingTableBuilderMap = new 
ConcurrentHashMap<>();
   private final Map<String, Integer> _lastKnownExternalViewVersionMap = new 
ConcurrentHashMap<>();
   private final Map<String, Map<String, InstanceConfig>> 
_lastKnownInstanceConfigsForTable = new ConcurrentHashMap<>();
   private final Map<String, InstanceConfig> _lastKnownInstanceConfigs = new 
ConcurrentHashMap<>();
   private final Map<String, Set<String>> _tablesForInstance = new 
ConcurrentHashMap<>();
   private final Map<String, SegmentSelector> _segmentSelectorMap = new 
ConcurrentHashMap<>();
 
-  private final HelixExternalViewBasedTimeBoundaryService _timeBoundaryService;
-  private final HelixManager _helixManager;
-  private static final int INVALID_EXTERNAL_VIEW_VERSION = Integer.MIN_VALUE;
-
-  private BrokerMetrics _brokerMetrics;
-
-  private Configuration _configuration;
+  private final Configuration _configuration;
 
+  private HelixManager _helixManager;
+  private HelixExternalViewBasedTimeBoundaryService _timeBoundaryService;
   private RoutingTableBuilderFactory _routingTableBuilderFactory;
   private SegmentSelectorProvider _segmentSelectorProvider;
+  private BrokerMetrics _brokerMetrics;
 
-  public HelixExternalViewBasedRouting(ZkHelixPropertyStore<ZNRecord> 
propertyStore, HelixManager helixManager,
-      Configuration configuration) {
+  public HelixExternalViewBasedRouting(Configuration configuration) {
     _configuration = configuration;
-    _timeBoundaryService = new 
HelixExternalViewBasedTimeBoundaryService(propertyStore);
-    _routingTableBuilderMap = new HashMap<>();
+  }
+
+  @Override
+  public void init(HelixManager helixManager) {
+    Preconditions.checkState(_helixManager == null, 
"HelixExternalViewBasedRouting is already initialized");
     _helixManager = helixManager;
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixManager.getHelixPropertyStore();
+    _timeBoundaryService = new 
HelixExternalViewBasedTimeBoundaryService(propertyStore);
     _routingTableBuilderFactory = new 
RoutingTableBuilderFactory(_configuration, propertyStore);
     _segmentSelectorProvider = new SegmentSelectorProvider(propertyStore);
   }
 
   @Override
+  public void processClusterChange(HelixConstants.ChangeType changeType) {
+    Preconditions.checkState(changeType == 
HelixConstants.ChangeType.EXTERNAL_VIEW
+        || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal 
change type: " + changeType);
+    if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
+      processExternalViewChange();
+    } else {
+      processInstanceConfigChange();
+    }
+  }
+
+  @Override
   public Map<String, List<String>> getRoutingTable(RoutingTableLookupRequest 
request) {
     String tableName = request.getTableName();
     RoutingTableBuilder routingTableBuilder = 
_routingTableBuilderMap.get(tableName);
@@ -601,15 +613,4 @@ public class HelixExternalViewBasedRouting implements 
ClusterChangeHandler, Rout
 
     return JsonUtils.objectToPrettyString(ret);
   }
-
-  @Override
-  public void processClusterChange(HelixConstants.ChangeType changeType) {
-    Preconditions.checkState(changeType == 
HelixConstants.ChangeType.EXTERNAL_VIEW
-        || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal 
change type: " + changeType);
-    if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
-      processExternalViewChange();
-    } else {
-      processInstanceConfigChange();
-    }
-  }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index b3a0452..1dfa1d0 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -74,7 +74,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
 
     _brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
8943);
     
_brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
 100L);
-    _helixBrokerStarter = new HelixBrokerStarter(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, _brokerConf);
+    _helixBrokerStarter = new HelixBrokerStarter(_brokerConf, 
getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR);
+    _helixBrokerStarter.start();
 
     ControllerRequestBuilderUtil
         .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, 5, true);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
index 1b5d709..3e5eae2 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
@@ -62,7 +62,8 @@ public class TableQueryQuotaManagerTest {
     _helixManager = initHelixManager(helixClusterName);
     _testPropertyStore = _helixManager.getHelixPropertyStore();
 
-    _tableQueryQuotaManager = new TableQueryQuotaManager(_helixManager);
+    _tableQueryQuotaManager = new TableQueryQuotaManager();
+    _tableQueryQuotaManager.init(_helixManager);
   }
 
   private HelixManager initHelixManager(String helixClusterName) {
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
index ac02919..5b5cdd6 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RandomRoutingTableTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.io.IOUtils;
+import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.model.ExternalView;
@@ -35,6 +36,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableConfig.Builder;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -58,7 +60,8 @@ public class RandomRoutingTableTest {
     int numSegmentsInEV = externalView.getPartitionSet().size();
     int numServersInEV = instanceConfigs.size();
 
-    HelixExternalViewBasedRouting routing = new 
HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+    HelixExternalViewBasedRouting routing = new 
HelixExternalViewBasedRouting(new BaseConfiguration());
+    routing.init(Mockito.mock(HelixManager.class));
     routing.markDataResourceOnline(generateTableConfig(tableName), 
externalView, instanceConfigs);
 
     for (int i = 0; i < NUM_ROUNDS; i++) {
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
index c89b1c2..e583799 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/RoutingTableTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import 
org.apache.pinot.broker.routing.builder.HighLevelConsumerBasedRoutingTableBuilder;
@@ -41,6 +42,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -51,7 +53,8 @@ public class RoutingTableTest {
   @Test
   public void testHelixExternalViewBasedRoutingTable()
       throws Exception {
-    HelixExternalViewBasedRouting routingTable = new 
HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+    HelixExternalViewBasedRouting routingTable = new 
HelixExternalViewBasedRouting(new BaseConfiguration());
+    routingTable.init(Mockito.mock(HelixManager.class));
 
     ExternalView externalView = new ExternalView("testResource0_OFFLINE");
     externalView.setState("segment0", "dataServer_instance_0", "ONLINE");
@@ -113,20 +116,22 @@ public class RoutingTableTest {
 
     final MutableBoolean timeBoundaryUpdated = new MutableBoolean(false);
 
-    HelixExternalViewBasedRouting routingTable =
-        new HelixExternalViewBasedRouting(propertyStore, null, new 
BaseConfiguration()) {
-          @Override
-          protected ExternalView fetchExternalView(String table) {
-            return offlineExternalView;
-          }
+    HelixExternalViewBasedRouting routingTable = new 
HelixExternalViewBasedRouting(new BaseConfiguration()) {
+      @Override
+      protected ExternalView fetchExternalView(String table) {
+        return offlineExternalView;
+      }
 
-          @Override
-          protected void updateTimeBoundary(String tableName, ExternalView 
externalView) {
-            if (tableName.equals("myTable_OFFLINE")) {
-              timeBoundaryUpdated.setValue(true);
-            }
-          }
-        };
+      @Override
+      protected void updateTimeBoundary(String tableName, ExternalView 
externalView) {
+        if (tableName.equals("myTable_OFFLINE")) {
+          timeBoundaryUpdated.setValue(true);
+        }
+      }
+    };
+    HelixManager helixManager = Mockito.mock(HelixManager.class);
+    
Mockito.when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    routingTable.init(helixManager);
     routingTable.setBrokerMetrics(new BrokerMetrics(new MetricsRegistry()));
 
     Assert.assertFalse(timeBoundaryUpdated.booleanValue());
@@ -166,7 +171,8 @@ public class RoutingTableTest {
 
     final LLCSegmentName llcSegmentName = new LLCSegmentName("testResource0", 
2, 65, System.currentTimeMillis());
 
-    HelixExternalViewBasedRouting routingTable = new 
HelixExternalViewBasedRouting(null, null, new BaseConfiguration());
+    HelixExternalViewBasedRouting routingTable = new 
HelixExternalViewBasedRouting(new BaseConfiguration());
+    routingTable.init(Mockito.mock(HelixManager.class));
 
     Field realtimeRTBField = 
HelixExternalViewBasedRouting.class.getDeclaredField("_realtimeHLCRoutingTableBuilder");
     realtimeRTBField.setAccessible(true);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 62ed53f..8ed7999 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -29,6 +29,8 @@ public class CommonConstants {
   public static class Helix {
     public static final String IS_SHUTDOWN_IN_PROGRESS = "shutdownInProgress";
 
+    public static final String INSTANCE_CONNECTED_METRIC_NAME = 
"helix.connected";
+
     public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
     public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
     public static final String PREFIX_OF_CONTROLLER_INSTANCE = "Controller_";
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 8f57c11..4a900c6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -219,7 +219,8 @@ public class ControllerStarter {
         .setup(_helixClusterName, _helixZkURL, _instanceId, 
_isUpdateStateModel, _enableBatchMessageMode);
 
     // Emit helix controller metrics
-    _controllerMetrics.addCallbackGauge("helix.connected", () -> 
_helixControllerManager.isConnected() ? 1L : 0L);
+    
_controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
+        () -> _helixControllerManager.isConnected() ? 1L : 0L);
     _controllerMetrics.addCallbackGauge("helix.leader", () -> 
_helixControllerManager.isLeader() ? 1L : 0L);
     _helixControllerManager.addPreConnectCallback(
         () -> 
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS,
 1L));
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d127779..d671311 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -127,7 +127,9 @@ public abstract class ClusterTest extends ControllerTest {
             BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
       }
       overrideBrokerConf(brokerConf);
-      _brokerStarters.add(new HelixBrokerStarter(_clusterName, zkStr, 
brokerConf));
+      HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf, 
_clusterName, zkStr);
+      brokerStarter.start();
+      _brokerStarters.add(brokerStarter);
     }
   }
 
@@ -225,8 +227,7 @@ public abstract class ClusterTest extends ControllerTest {
   protected void stopBroker() {
     for (HelixBrokerStarter brokerStarter : _brokerStarters) {
       try {
-        // TODO: replace with brokerStarter.shutdown() once they are hooked up
-        brokerStarter.getBrokerServerBuilder().stop();
+        brokerStarter.shutdown();
       } catch (Exception e) {
         LOGGER.error("Encountered exception while stopping broker {}", 
e.getMessage());
       }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index 64f7b32..5d3ed26 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -174,7 +174,8 @@ public class HelixServerStarter {
     _helixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
messageHandlerFactory);
 
-    serverMetrics.addCallbackGauge("helix.connected", () -> 
_helixManager.isConnected() ? 1L : 0L);
+    
serverMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
+        () -> _helixManager.isConnected() ? 1L : 0L);
     _helixManager
         .addPreConnectCallback(() -> 
serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 
1L));
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
index 52b046f..1e6af76 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
@@ -41,7 +41,6 @@ public class StartBrokerCommand extends 
AbstractBaseAdminCommand implements Comm
 
   @Option(name = "-brokerPort", required = false, metaVar = "<int>", usage = 
"Broker port number to use for query.")
   private int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
-  ;
 
   @Option(name = "-zkAddress", required = false, metaVar = "<http>", usage = 
"HTTP address of Zookeeper.")
   private String _zkAddress = DEFAULT_ZK_ADDRESS;
@@ -59,6 +58,8 @@ public class StartBrokerCommand extends 
AbstractBaseAdminCommand implements Comm
     return _help;
   }
 
+  private HelixBrokerStarter _brokerStarter;
+
   @Override
   public String getName() {
     return "StartBroker";
@@ -75,7 +76,9 @@ public class StartBrokerCommand extends 
AbstractBaseAdminCommand implements Comm
 
   @Override
   public void cleanup() {
-
+    if (_brokerStarter != null) {
+      _brokerStarter.shutdown();
+    }
   }
 
   @Override
@@ -119,7 +122,8 @@ public class StartBrokerCommand extends 
AbstractBaseAdminCommand implements Comm
       }
 
       LOGGER.info("Executing command: " + toString());
-      new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress, 
brokerConf);
+      _brokerStarter = new HelixBrokerStarter(brokerConf, _clusterName, 
_zkAddress, _brokerHost);
+      _brokerStarter.start();
 
       String pidFile = ".pinotAdminBroker-" + System.currentTimeMillis() + 
".pid";
       savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index 1df27e5..6d7928d 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -221,7 +221,7 @@ public class PerfBenchmarkDriver {
     brokerConf.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, 
brokerInstanceName);
     brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 
BROKER_TIMEOUT_MS);
     LOGGER.info("Starting broker instance: {}", brokerInstanceName);
-    new HelixBrokerStarter(_clusterName, _zkAddress, brokerConf);
+    new HelixBrokerStarter(brokerConf, _clusterName, _zkAddress).start();
   }
 
   private void startServer()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to