This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch broker_starter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1b537cb655339932b25f48001fa3b0b8f9e30a8f
Author: Jackie (Xiaotian) Jiang <[email protected]>
AuthorDate: Tue Apr 9 17:59:14 2019 -0700

    Refactor HelixBrokerStarter to separate constructor and start()
    
    For the starter class, constructor should only set the properies,
    but not start the service, which should be done by start() instead.
    
    - Refactor HelixBrokerStarter to separate constructor and start()
    - Remove redundant configs, always use constant for configs
    - Modify shutdown() to shut done all components properly
    
    NOTE: THIS IS A BACKWARD-INCOMPATIBLIE CHANGE
    - Need to call start() separately in order to start the broker
    - Order of arguments changed in constructor (move nullable to the
      last. This is intentional so that user can find the imcompatible
      easier during compilation, thus add the start())
---
 .../pinot/broker/broker/BrokerServerBuilder.java   |   7 +-
 .../broker/broker/helix/ClusterChangeHandler.java  |   4 +
 .../broker/helix/DefaultHelixBrokerConfig.java     |  59 -----
 .../broker/broker/helix/HelixBrokerStarter.java    | 288 +++++++++------------
 .../pinot/broker/broker/BrokerTestUtils.java       |  51 ----
 .../broker/broker/HelixBrokerStarterTest.java      |  34 +--
 .../broker/broker/HelixBrokerStarterUtilsTest.java |  63 -----
 .../apache/pinot/common/utils/CommonConstants.java |   4 +
 .../pinot/integration/tests/ClusterTest.java       |  37 +--
 .../tests/NewConfigApplyIntegrationTest.java       |   3 +-
 .../tools/admin/command/StartBrokerCommand.java    |  22 +-
 .../pinot/tools/perf/PerfBenchmarkDriver.java      |   2 +-
 12 files changed, 188 insertions(+), 386 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index 387e214..a63e954 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -109,11 +109,12 @@ public class BrokerServerBuilder {
     _state.set(State.STARTING);
 
     _brokerRequestHandler.start();
-    _brokerAdminApplication.start(_config
-        .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
+    int brokerQueryPort =
+        _config.getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT);
+    _brokerAdminApplication.start(brokerQueryPort);
 
     _state.set(State.RUNNING);
-    LOGGER.info("Pinot Broker started");
+    LOGGER.info("Pinot Broker is started and listening on port {} for API 
requests", brokerQueryPort);
   }
 
   public void stop() {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
index d4b82da..69a4524 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeHandler.java
@@ -19,11 +19,15 @@
 package org.apache.pinot.broker.broker.helix;
 
 import org.apache.helix.HelixConstants;
+import org.apache.pinot.annotations.InterfaceAudience;
+import org.apache.pinot.annotations.InterfaceStability;
 
 
 /**
  * Handles cluster changes such as external view changes, instance config 
changes, live instance changes etc.
  */
[email protected]
[email protected]
 public interface ClusterChangeHandler {
 
   /**
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java
deleted file mode 100644
index c264598..0000000
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker.helix;
-
-import java.util.Iterator;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-
-public class DefaultHelixBrokerConfig {
-  public static final String HELIX_FLAPPING_TIME_WINDOW_NAME = 
"pinot.broker.helix.flappingTimeWindowMs";
-  public static final String DEFAULT_HELIX_FLAPPING_TIMEIWINDWOW_MS = "0";
-
-  public static Configuration getDefaultBrokerConf() {
-    Configuration brokerConf = new PropertiesConfiguration();
-
-    // config based routing
-    brokerConf.addProperty("pinot.broker.transport.routingMode", "HELIX");
-
-    
brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.class",
 "balanced");
-    
brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.numOfRoutingTables",
 "10");
-    brokerConf.addProperty("pinot.broker.routing.table.builder.tables", "");
-
-    //client properties
-    brokerConf.addProperty("pinot.broker.client.queryPort", "8099");
-
-    // [PINOT-2435] setting to 0 so it doesn't disconnect from zk
-    brokerConf.addProperty("pinot.broker.helix.flappingTimeWindowMs", "0");
-
-    return brokerConf;
-  }
-
-  public static Configuration getDefaultBrokerConf(Configuration 
externalConfigs) {
-    final Configuration defaultConfigs = getDefaultBrokerConf();
-    @SuppressWarnings("unchecked")
-    Iterator<String> iterable = externalConfigs.getKeys();
-    while (iterable.hasNext()) {
-      String key = iterable.next();
-      defaultConfigs.setProperty(key, externalConfigs.getProperty(key));
-    }
-    return defaultConfigs;
-  }
-}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 337a6c3..8f98b0d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -25,15 +25,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
@@ -47,189 +48,196 @@ import 
org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
-import org.apache.pinot.common.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-/**
- * Helix Broker Startable
- *
- *
- */
 public class HelixBrokerStarter {
-  private static final String PROPERTY_STORE = "PROPERTYSTORE";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixBrokerStarter.class);
+  private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = 
"pinot.broker.routing.table";
+
+  private final Configuration _properties;
+  private final String _clusterName;
+  private final String _zkServers;
+  private final String _brokerId;
 
   // Spectator Helix manager handles the custom change listeners, properties 
read/write
-  private final HelixManager _spectatorHelixManager;
+  private HelixManager _spectatorHelixManager;
   // Participant Helix manager handles Helix functionality such as state 
transitions and messages
-  private final HelixManager _participantHelixManager;
-
-  private final Configuration _pinotHelixProperties;
-  private final HelixAdmin _helixAdmin;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final HelixDataAccessor _helixDataAccessor;
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final BrokerServerBuilder _brokerServerBuilder;
-  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
-  private final MetricsRegistry _metricsRegistry;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
-  private final ClusterChangeMediator _clusterChangeMediator;
-  private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
-
-  // Set after broker is started, which is actually in the constructor.
-  private AccessControlFactory _accessControlFactory;
+  private HelixManager _participantHelixManager;
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixBrokerStarter.class);
-
-  private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = 
"pinot.broker.routing.table";
+  private HelixAdmin _helixAdmin;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private HelixDataAccessor _helixDataAccessor;
+  private HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
+  private BrokerServerBuilder _brokerServerBuilder;
+  private AccessControlFactory _accessControlFactory;
+  private MetricsRegistry _metricsRegistry;
+  private ClusterChangeMediator _clusterChangeMediator;
+  private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
 
-  public HelixBrokerStarter(String helixClusterName, String zkServer, 
Configuration pinotHelixProperties)
+  public HelixBrokerStarter(Configuration properties, String clusterName, 
String zkServer)
       throws Exception {
-    this(null, helixClusterName, zkServer, pinotHelixProperties);
+    this(properties, clusterName, zkServer, null);
   }
 
-  public HelixBrokerStarter(String brokerHost, String helixClusterName, String 
zkServer,
-      Configuration pinotHelixProperties)
+  public HelixBrokerStarter(Configuration properties, String clusterName, 
String zkServer, @Nullable String brokerHost)
       throws Exception {
-    LOGGER.info("Starting Pinot broker");
+    _properties = properties;
+    setupHelixSystemProperties();
 
-    _pinotHelixProperties = 
DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties);
+    _clusterName = clusterName;
+
+    // Remove all white-spaces from the list of zkServers (if any).
+    _zkServers = zkServer.replaceAll("\\s+", "");
 
     if (brokerHost == null) {
       brokerHost = NetUtil.getHostAddress();
     }
-
-    final String brokerId = 
_pinotHelixProperties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
-        CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + 
_pinotHelixProperties
+    _brokerId = 
_properties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
+        CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + 
_properties
             .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
+    _properties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, 
_brokerId);
+  }
 
-    
_pinotHelixProperties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, 
brokerId);
-    setupHelixSystemProperties();
+  private void setupHelixSystemProperties() {
+    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _properties
+        
.getString(CommonConstants.Broker.CONFIG_OF_HELIX_FLAPPING_TIME_WINDOW_MS,
+            CommonConstants.Broker.DEFAULT_HELIX_FLAPPING_TIME_WINDOW_MS));
+  }
 
-    // Remove all white-spaces from the list of zkServers (if any).
-    String zkServers = zkServer.replaceAll("\\s+", "");
+  public void start()
+      throws Exception {
+    LOGGER.info("Starting Pinot broker");
 
-    LOGGER.info("Connecting Helix components");
-    // Connect spectator Helix manager.
+    // Connect the spectator Helix manager
+    LOGGER.info("Connecting spectator Helix manager");
     _spectatorHelixManager =
-        HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, 
InstanceType.SPECTATOR, zkServers);
+        HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId, 
InstanceType.SPECTATOR, _zkServers);
     _spectatorHelixManager.connect();
     _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
     _helixExternalViewBasedRouting = new 
HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
-        pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
-    _tableQueryQuotaManager = new 
TableQueryQuotaManager(_spectatorHelixManager);
-    _liveInstanceChangeHandler = new 
LiveInstanceChangeHandler(_spectatorHelixManager);
-    _brokerServerBuilder = startBroker(_pinotHelixProperties);
+        _properties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+    TableQueryQuotaManager tableQueryQuotaManager = new 
TableQueryQuotaManager(_spectatorHelixManager);
+    LiveInstanceChangeHandler liveInstanceChangeHandler = new 
LiveInstanceChangeHandler(_spectatorHelixManager);
+
+    // Set up the broker server builder
+    LOGGER.info("Setting up broker server builder");
+    _brokerServerBuilder = new BrokerServerBuilder(_properties, 
_helixExternalViewBasedRouting,
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), 
liveInstanceChangeHandler, tableQueryQuotaManager);
+    _accessControlFactory = _brokerServerBuilder.getAccessControlFactory();
     _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
+    BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
+    _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
+    tableQueryQuotaManager.setBrokerMetrics(brokerMetrics);
+    _brokerServerBuilder.start();
 
-    // Initialize cluster change mediator
+    // Initialize the cluster change mediator
+    LOGGER.info("Initializing cluster change mediator");
     Map<ChangeType, List<ClusterChangeHandler>> changeHandlersMap = new 
HashMap<>();
     List<ClusterChangeHandler> externalViewChangeHandlers = new ArrayList<>();
     externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
-    externalViewChangeHandlers.add(_tableQueryQuotaManager);
+    externalViewChangeHandlers.add(tableQueryQuotaManager);
     
externalViewChangeHandlers.addAll(getCustomExternalViewChangeHandlers(_spectatorHelixManager));
     changeHandlersMap.put(ChangeType.EXTERNAL_VIEW, 
externalViewChangeHandlers);
     List<ClusterChangeHandler> instanceConfigChangeHandlers = new 
ArrayList<>();
     instanceConfigChangeHandlers.add(_helixExternalViewBasedRouting);
     
instanceConfigChangeHandlers.addAll(getCustomInstanceConfigChangeHandlers(_spectatorHelixManager));
     changeHandlersMap.put(ChangeType.INSTANCE_CONFIG, 
instanceConfigChangeHandlers);
-    List<ClusterChangeHandler> liveInstanceChangeHandler = new ArrayList<>();
-    liveInstanceChangeHandler.add(_liveInstanceChangeHandler);
-    
liveInstanceChangeHandler.addAll(getCustomLiveInstanceChangeHandlers(_spectatorHelixManager));
-    changeHandlersMap.put(ChangeType.LIVE_INSTANCE, liveInstanceChangeHandler);
-    _clusterChangeMediator = new ClusterChangeMediator(changeHandlersMap, 
_brokerServerBuilder.getBrokerMetrics());
+    List<ClusterChangeHandler> liveInstanceChangeHandlers = new ArrayList<>();
+    liveInstanceChangeHandlers.add(liveInstanceChangeHandler);
+    
liveInstanceChangeHandlers.addAll(getCustomLiveInstanceChangeHandlers(_spectatorHelixManager));
+    changeHandlersMap.put(ChangeType.LIVE_INSTANCE, 
liveInstanceChangeHandlers);
+    _clusterChangeMediator = new ClusterChangeMediator(changeHandlersMap, 
brokerMetrics);
     _clusterChangeMediator.start();
     
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
     
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
     
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
 
-    // Connect participant Helix manager.
+    // Connect the participant Helix manager
+    LOGGER.info("Connecting participant Helix manager");
     _participantHelixManager =
-        HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, 
InstanceType.PARTICIPANT, zkServers);
+        HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId, 
InstanceType.PARTICIPANT, _zkServers);
     StateMachineEngine stateMachineEngine = 
_participantHelixManager.getStateMachineEngine();
     StateModelFactory<?> stateModelFactory =
         new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, 
_helixDataAccessor,
-            _helixExternalViewBasedRouting, _tableQueryQuotaManager);
+            _helixExternalViewBasedRouting, tableQueryQuotaManager);
     stateMachineEngine
         
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
 stateModelFactory);
     _participantHelixManager.connect();
-    _tbiMessageHandler = new 
TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting,
-        
_pinotHelixProperties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
+    _tbiMessageHandler = new 
TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting, 
_properties
+        
.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
             
CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS));
     _participantHelixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
_tbiMessageHandler);
-
-    addInstanceTagIfNeeded(helixClusterName, brokerId);
+    addInstanceTagIfNeeded();
+    brokerMetrics.addCallbackGauge("helix.connected", () -> 
_participantHelixManager.isConnected() ? 1L : 0L);
+    _participantHelixManager
+        .addPreConnectCallback(() -> 
brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 
1L));
 
     // Register the service status handler
-    double minResourcePercentForStartup = _pinotHelixProperties
+    LOGGER.info("Registering service status handler");
+    double minResourcePercentForStartup = _properties
         
.getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
             
CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
     ServiceStatus.setServiceStatusCallback(new 
ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
         .of(new 
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_participantHelixManager,
-                helixClusterName, brokerId, minResourcePercentForStartup),
+                _clusterName, _brokerId, minResourcePercentForStartup),
             new 
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager,
-                helixClusterName, brokerId, minResourcePercentForStartup))));
-
-    _brokerServerBuilder.getBrokerMetrics()
-        .addCallbackGauge("helix.connected", () -> 
_participantHelixManager.isConnected() ? 1L : 0L);
+                _clusterName, _brokerId, minResourcePercentForStartup))));
 
-    _participantHelixManager.addPreConnectCallback(() -> 
_brokerServerBuilder.getBrokerMetrics()
-        .addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
+    LOGGER.info("Finish starting Pinot broker");
   }
 
-  private void setupHelixSystemProperties() {
-    final String helixFlappingTimeWindowPropName = 
"helixmanager.flappingTimeWindow";
-    System.setProperty(helixFlappingTimeWindowPropName, _pinotHelixProperties
-        .getString(DefaultHelixBrokerConfig.HELIX_FLAPPING_TIME_WINDOW_NAME,
-            DefaultHelixBrokerConfig.DEFAULT_HELIX_FLAPPING_TIMEIWINDWOW_MS));
-  }
-
-  private void addInstanceTagIfNeeded(String clusterName, String instanceName) 
{
+  private void addInstanceTagIfNeeded() {
     InstanceConfig instanceConfig =
-        
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(instanceName));
+        
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(_brokerId));
     List<String> instanceTags = instanceConfig.getTags();
     if (instanceTags == null || instanceTags.isEmpty()) {
       if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) 
{
-        _helixAdmin.addInstanceTag(clusterName, instanceName,
+        _helixAdmin.addInstanceTag(_clusterName, _brokerId,
             
TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
       } else {
-        _helixAdmin.addInstanceTag(clusterName, instanceName, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+        _helixAdmin.addInstanceTag(_clusterName, _brokerId, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
       }
     }
   }
 
-  private BrokerServerBuilder startBroker(Configuration config) {
-    if (config == null) {
-      config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
+  public void shutdown() {
+    LOGGER.info("Shutting down Pinot broker");
+
+    if (_tbiMessageHandler != null) {
+      LOGGER.info("Shutting down time boundary info refresh message handler");
+      _tbiMessageHandler.shutdown();
     }
-    BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, 
_helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), 
_liveInstanceChangeHandler, _tableQueryQuotaManager);
-    _accessControlFactory = brokerServerBuilder.getAccessControlFactory();
-    
_helixExternalViewBasedRouting.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
-    
_tableQueryQuotaManager.setBrokerMetrics(brokerServerBuilder.getBrokerMetrics());
-    brokerServerBuilder.start();
-
-    LOGGER.info("Pinot broker ready and listening on port {} for API requests",
-        config.getProperty("pinot.broker.client.queryPort"));
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        try {
-          brokerServerBuilder.stop();
-        } catch (final Exception e) {
-          LOGGER.error("Caught exception while running shutdown hook", e);
-        }
-      }
-    });
-    return brokerServerBuilder;
+
+    if (_participantHelixManager != null) {
+      LOGGER.info("Disconnecting participant Helix manager");
+      _participantHelixManager.disconnect();
+    }
+
+    if (_clusterChangeMediator != null) {
+      LOGGER.info("Stopping cluster change mediator");
+      _clusterChangeMediator.stop();
+    }
+
+    if (_brokerServerBuilder != null) {
+      LOGGER.info("Stopping broker server builder");
+      _brokerServerBuilder.stop();
+    }
+
+    if (_spectatorHelixManager != null) {
+      LOGGER.info("Disconnecting spectator Helix manager");
+      _spectatorHelixManager.disconnect();
+    }
+
+    LOGGER.info("Finish shutting down Pinot broker");
   }
 
   /**
@@ -275,32 +283,6 @@ public class HelixBrokerStarter {
     return _accessControlFactory;
   }
 
-  /**
-   * The zk string format should be 127.0.0.1:3000,127.0.0.1:3001/app/a which 
applies
-   * the /helixClusterName/PROPERTY_STORE after chroot to all servers.
-   * Expected output for this method is:
-   * 
127.0.0.1:3000/app/a/helixClusterName/PROPERTY_STORE,127.0.0.1:3001/app/a/helixClusterName/PROPERTY_STORE
-   *
-   * @param zkServers
-   * @param helixClusterName
-   * @return the full property store path
-   *
-   * @see org.apache.zookeeper.ZooKeeper#ZooKeeper(String, int, 
org.apache.zookeeper.Watcher)
-   */
-  public static String getZkAddressForBroker(String zkServers, String 
helixClusterName) {
-    List tokens = new ArrayList<String>();
-    String[] zkSplit = zkServers.split("/", 2);
-    String zkHosts = zkSplit[0];
-    String zkPathSuffix = StringUtil.join("/", helixClusterName, 
PROPERTY_STORE);
-    if (zkSplit.length > 1) {
-      zkPathSuffix = zkSplit[1] + "/" + zkPathSuffix;
-    }
-    for (String token : zkHosts.split(",")) {
-      tokens.add(StringUtil.join("/", StringUtils.chomp(token, "/"), 
zkPathSuffix));
-    }
-    return StringUtils.join(tokens, ",");
-  }
-
   public HelixManager getSpectatorHelixManager() {
     return _spectatorHelixManager;
   }
@@ -313,45 +295,21 @@ public class HelixBrokerStarter {
     return _brokerServerBuilder;
   }
 
-  public static HelixBrokerStarter startDefault()
-      throws Exception {
-    Configuration configuration = new PropertiesConfiguration();
-    int port = 5001;
-    configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
port);
-    configuration.addProperty("pinot.broker.timeoutMs", 500 * 1000L);
-
-    final HelixBrokerStarter pinotHelixBrokerStarter =
-        new HelixBrokerStarter(null, "quickstart", "localhost:2122", 
configuration);
-    return pinotHelixBrokerStarter;
-  }
-
-  public void shutdown() {
-    LOGGER.info("Shutting down");
-
-    if (_participantHelixManager != null) {
-      LOGGER.info("Disconnecting participant Helix manager");
-      _participantHelixManager.disconnect();
-    }
-
-    if (_spectatorHelixManager != null) {
-      LOGGER.info("Disconnecting spectator Helix manager");
-      _spectatorHelixManager.disconnect();
-    }
-
-    if (_tbiMessageHandler != null) {
-      LOGGER.info("Shutting down timeboundary info refresh message handler");
-      _tbiMessageHandler.shutdown();
-    }
-
-    _clusterChangeMediator.stop();
-  }
-
   public MetricsRegistry getMetricsRegistry() {
     return _metricsRegistry;
   }
 
+  public static HelixBrokerStarter getDefault()
+      throws Exception {
+    Configuration properties = new PropertiesConfiguration();
+    int port = 5001;
+    properties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
port);
+    properties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 
30 * 1000L);
+    return new HelixBrokerStarter(properties, "quickstart", "localhost:2122");
+  }
+
   public static void main(String[] args)
       throws Exception {
-    startDefault();
+    getDefault().start();
   }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java
deleted file mode 100644
index f863e56..0000000
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
-import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
-
-
-/**
- * Utilities to start a broker during unit tests.
- *
- */
-public class BrokerTestUtils {
-  public static Configuration getDefaultBrokerConfiguration() {
-    return DefaultHelixBrokerConfig.getDefaultBrokerConf();
-  }
-
-  public static HelixBrokerStarter startBroker(final String clusterName, final 
String zkStr,
-      final Configuration configuration) {
-    try {
-      return new HelixBrokerStarter(clusterName, zkStr, configuration);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static void stopBroker(final HelixBrokerStarter brokerStarter) {
-    try {
-      brokerStarter.getBrokerServerBuilder().stop();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 08387a8..7436a89 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -30,9 +30,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
@@ -57,7 +57,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
   private static final String RAW_DINING_TABLE_NAME = "dining";
   private static final String DINING_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
   private static final String COFFEE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType("coffee");
-  private final Configuration _pinotHelixBrokerProperties = 
DefaultHelixBrokerConfig.getDefaultBrokerConf();
+
+  private final Configuration _properties = new PropertiesConfiguration();
 
   private ZkClient _zkClient;
   private HelixBrokerStarter _helixBrokerStarter;
@@ -71,11 +72,10 @@ public class HelixBrokerStarterTest extends ControllerTest {
 
     startController();
 
-    
_pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT,
 8943);
-    _pinotHelixBrokerProperties
-        
.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
 100L);
-    _helixBrokerStarter =
-        new HelixBrokerStarter(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties);
+    _properties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
8943);
+    
_properties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
 100L);
+    _helixBrokerStarter = new HelixBrokerStarter(_properties, 
getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR);
+    _helixBrokerStarter.start();
 
     ControllerRequestBuilderUtil
         .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, 5, true);
@@ -139,8 +139,10 @@ public class HelixBrokerStarterTest extends ControllerTest 
{
       throws Exception {
     IdealState idealState;
 
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(), 6);
-    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(),
+        6);
+    idealState =
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), 
SEGMENT_COUNT);
 
     ExternalView externalView =
@@ -173,8 +175,10 @@ public class HelixBrokerStarterTest extends ControllerTest 
{
         .setBrokerTenant("testBroker").setServerTenant("testServer").build();
     _helixResourceManager.addTable(tableConfig);
 
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(), 6);
-    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(),
+        6);
+    idealState =
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), 
SEGMENT_COUNT);
     Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), 
SEGMENT_COUNT);
 
@@ -183,8 +187,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
       @Override
       public Boolean call()
           throws Exception {
-        return _helixAdmin.getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
-            .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
+        return
+            _helixAdmin.getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+                .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
       }
     }, 30000L);
 
@@ -273,8 +278,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
       TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo = 
_helixBrokerStarter.getHelixExternalViewBasedRouting().
           getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
       return currentTimeBoundary < 
Long.parseLong(timeBoundaryInfo.getTimeValue());
-    }, 5 * _pinotHelixBrokerProperties
-        
.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
+    }, 5 * 
_properties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
     tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
         getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
     Assert.assertTrue(currentTimeBoundary < 
Long.parseLong(tbi.getTimeValue()));
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java
deleted file mode 100644
index f080f0e..0000000
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class HelixBrokerStarterUtilsTest {
-
-  @Test
-  public void testZkParserUtil1() {
-    String zkServers = "hostname1,hostname2";
-    String zkAddressForBroker = 
HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
-    String expectedZkAddressForBroker =
-        
"hostname1/helixClusterName/PROPERTYSTORE,hostname2/helixClusterName/PROPERTYSTORE";
-    Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
-  }
-
-  @Test
-  public void testZkParserUtil2() {
-    String zkServers = "hostname1,hostname2/chroot1/chroot2";
-    String zkAddressForBroker = 
HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
-    String expectedZkAddressForBroker =
-        
"hostname1/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2/chroot1/chroot2/helixClusterName/PROPERTYSTORE";
-    Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
-  }
-
-  @Test
-  public void testZkParserUtil3() {
-    String zkServers = "hostname1:2181,hostname2:2181";
-    String zkAddressForBroker = 
HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
-    String expectedZkAddressForBroker =
-        
"hostname1:2181/helixClusterName/PROPERTYSTORE,hostname2:2181/helixClusterName/PROPERTYSTORE";
-    Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
-  }
-
-  @Test
-  public void testZkParserUtil4() {
-    String zkServers = "hostname1:2181,hostname2:2181/chroot1/chroot2";
-    String zkAddressForBroker = 
HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
-    String expectedZkAddressForBroker =
-        
"hostname1:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE";
-    Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
-  }
-}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 137288c..d1e74e6 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -130,6 +130,10 @@ public class CommonConstants {
     public static final String CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START 
= "pinot.broker.startup.minResourcePercent";
     public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 
100.0;
 
+    // [PINOT-2435] setting to 0 so it doesn't disconnect from zk
+    public static final String CONFIG_OF_HELIX_FLAPPING_TIME_WINDOW_MS = 
"pinot.broker.helix.flappingTimeWindowMs";
+    public static final String DEFAULT_HELIX_FLAPPING_TIME_WINDOW_MS = "0";
+
     public static class Request {
       public static final String PQL = "pql";
       public static final String TRACE = "trace";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f677be3..cc0b4d9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -44,13 +44,13 @@ import 
org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.http.HttpStatus;
 import org.apache.pinot.broker.broker.BrokerServerBuilder;
-import org.apache.pinot.broker.broker.BrokerTestUtils;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TableTaskConfig;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Minion;
 import org.apache.pinot.common.utils.CommonConstants.Server;
@@ -96,33 +96,38 @@ public abstract class ClusterTest extends ControllerTest {
   protected TableConfig _offlineTableConfig;
   protected TableConfig _realtimeTableConfig;
 
-  protected void startBroker() {
+  protected void startBroker()
+      throws Exception {
     startBrokers(1);
   }
 
-  protected void startBroker(int basePort, String zkStr) {
+  protected void startBroker(int basePort, String zkStr)
+      throws Exception {
     startBrokers(1, basePort, zkStr);
   }
 
-  protected void startBrokers(int numBrokers) {
+  protected void startBrokers(int numBrokers)
+      throws Exception {
     startBrokers(numBrokers, DEFAULT_BROKER_PORT, ZkStarter.DEFAULT_ZK_STR);
   }
 
-  protected void startBrokers(int numBrokers, int basePort, String zkStr) {
+  protected void startBrokers(int numBrokers, int basePort, String zkStr)
+      throws Exception {
     _brokerBaseApiUrl = "http://localhost:"; + basePort;
     for (int i = 0; i < numBrokers; i++) {
-      Configuration configuration = 
BrokerTestUtils.getDefaultBrokerConfiguration();
-      configuration.setProperty("pinot.broker.timeoutMs", 100 * 1000L);
-      configuration.setProperty("pinot.broker.client.queryPort", 
Integer.toString(basePort + i));
-      configuration.setProperty("pinot.broker.routing.table.builder.class", 
"random");
-      
configuration.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
+      Configuration properties = new PropertiesConfiguration();
+      
properties.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 100 
* 1000L);
+      properties.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, 
Integer.toString(basePort + i));
+      
properties.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
       // Randomly choose to use connection-pool or single-connection request 
handler
       if (RANDOM.nextBoolean()) {
-        
configuration.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
+        properties.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
             BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
       }
-      overrideBrokerConf(configuration);
-      _brokerStarters.add(BrokerTestUtils.startBroker(_clusterName, zkStr, 
configuration));
+      overrideBrokerConf(properties);
+      HelixBrokerStarter brokerStarter = new HelixBrokerStarter(properties, 
_clusterName, zkStr);
+      brokerStarter.start();
+      _brokerStarters.add(brokerStarter);
     }
   }
 
@@ -219,11 +224,7 @@ public abstract class ClusterTest extends ControllerTest {
 
   protected void stopBroker() {
     for (HelixBrokerStarter brokerStarter : _brokerStarters) {
-      try {
-        BrokerTestUtils.stopBroker(brokerStarter);
-      } catch (Exception e) {
-        LOGGER.error("Encountered exception while stopping broker {}", 
e.getMessage());
-      }
+      brokerStarter.shutdown();
     }
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
index 5fb52cc..8f0045d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
@@ -43,7 +43,8 @@ public class NewConfigApplyIntegrationTest extends 
BaseClusterIntegrationTest {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(NewConfigApplyIntegrationTest.class);
 
   @BeforeClass
-  public void setUp() {
+  public void setUp()
+      throws Exception {
     // Start an empty cluster
     startZk();
     startController();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
index 60bcf2c..01d0cb5 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
@@ -41,7 +41,6 @@ public class StartBrokerCommand extends 
AbstractBaseAdminCommand implements Comm
 
   @Option(name = "-brokerPort", required = false, metaVar = "<int>", usage = 
"Broker port number to use for query.")
   private int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
-  ;
 
   @Option(name = "-zkAddress", required = false, metaVar = "<http>", usage = 
"HTTP address of Zookeeper.")
   private String _zkAddress = DEFAULT_ZK_ADDRESS;
@@ -59,6 +58,8 @@ public class StartBrokerCommand extends 
AbstractBaseAdminCommand implements Comm
     return _help;
   }
 
+  private HelixBrokerStarter _brokerStarter;
+
   @Override
   public String getName() {
     return "StartBroker";
@@ -75,7 +76,9 @@ public class StartBrokerCommand extends 
AbstractBaseAdminCommand implements Comm
 
   @Override
   public void cleanup() {
-
+    if (_brokerStarter != null) {
+      _brokerStarter.shutdown();
+    }
   }
 
   @Override
@@ -107,23 +110,22 @@ public class StartBrokerCommand extends 
AbstractBaseAdminCommand implements Comm
   public boolean execute()
       throws Exception {
     try {
-      Configuration configuration = readConfigFromFile(_configFileName);
-      if (configuration == null) {
+      Configuration properties = readConfigFromFile(_configFileName);
+      if (properties == null) {
         if (_configFileName != null) {
           LOGGER.error("Error: Unable to find file {}.", _configFileName);
           return false;
         }
 
-        configuration = new PropertiesConfiguration();
-        
configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
_brokerPort);
-        configuration.setProperty("pinot.broker.routing.table.builder.class", 
"random");
+        properties = new PropertiesConfiguration();
+        properties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 
_brokerPort);
       }
 
       LOGGER.info("Executing command: " + toString());
-      final HelixBrokerStarter pinotHelixBrokerStarter =
-          new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress, 
configuration);
+      _brokerStarter = new HelixBrokerStarter(properties, _clusterName, 
_zkAddress, _brokerHost);
+      _brokerStarter.start();
 
-      String pidFile = ".pinotAdminBroker-" + 
String.valueOf(System.currentTimeMillis()) + ".pid";
+      String pidFile = ".pinotAdminBroker-" + System.currentTimeMillis() + 
".pid";
       savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile);
       return true;
     } catch (Exception e) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index c8a6005..ae28912 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -214,7 +214,7 @@ public class PerfBenchmarkDriver {
     
brokerConfiguration.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, 
brokerInstanceName);
     
brokerConfiguration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
 BROKER_TIMEOUT_MS);
     LOGGER.info("Starting broker instance: {}", brokerInstanceName);
-    new HelixBrokerStarter(_clusterName, _zkAddress, brokerConfiguration);
+    new HelixBrokerStarter(brokerConfiguration, _clusterName, 
_zkAddress).start();
   }
 
   private void startServer()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to