This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch helix_setup
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit db6fdf8da0321d1a1ef283bdb3becd64ad6c4068
Author: Jackie (Xiaotian) Jiang <[email protected]>
AuthorDate: Thu Jul 11 18:20:42 2019 -0700

    Cleanup un-necessary setups in HelixSetupUtils
    
    - Do not enable leadControllerResource by default (no auto-rebalance)
    - Remove redundant empty brokerResource resource config
    - Remove redundant empty leadControllerResource resource config
    - No need to initialize property store sub-directories
---
 .../apache/pinot/controller/ControllerStarter.java |  20 +-
 .../helix/core/util/HelixSetupUtils.java           | 213 +++++++-------------
 .../helix/ControllerPeriodicTaskStarterTest.java   |  38 +---
 .../pinot/controller/helix/ControllerTest.java     |  54 ++---
 .../controller/helix/PinotControllerModeTest.java  | 217 +++++++++++----------
 .../helix/core/PinotHelixResourceManagerTest.java  | 107 +++++-----
 .../test/java/org/apache/pinot/util/TestUtils.java |  39 ++--
 7 files changed, 284 insertions(+), 404 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index c313616..ee2fda1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.yammer.metrics.core.MetricsRegistry;
@@ -31,6 +32,7 @@ import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
@@ -219,7 +221,7 @@ public class ControllerStarter {
   private void setUpHelixController() {
     // Register and connect instance as Helix controller.
     LOGGER.info("Starting Helix controller");
-    _helixControllerManager = HelixSetupUtils.setup(_helixClusterName, 
_helixZkURL, _instanceId);
+    _helixControllerManager = 
HelixSetupUtils.setupHelixController(_helixClusterName, _helixZkURL, 
_instanceId);
 
     // Emit helix controller metrics
     
_controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
@@ -236,12 +238,10 @@ public class ControllerStarter {
   }
 
   private void setUpPinotController() {
-    // Note: Right now we don't allow pinot-only mode to be used in production 
yet.
-    // Now we only have this mode used in tests.
-    // TODO: Remove this logic once all the helix separation PRs are committed.
-    if (_controllerMode == ControllerConf.ControllerMode.PINOT_ONLY && 
!isPinotOnlyModeSupported()) {
-      throw new RuntimeException("Pinot only controller currently isn't 
supported in production yet.");
-    }
+    // Note: Right now we don't allow Pinot-only controller as 
ControllerLeadershipManager is setup in Helix controller
+    //       and Pinot controller relies on it
+    // TODO: Remove ControllerLeadershipManager
+    Preconditions.checkState(_controllerLeadershipManager != null);
 
     // Set up Pinot cluster in Helix
     HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, 
_isUpdateStateModel, _enableBatchMessageMode);
@@ -500,16 +500,14 @@ public class ControllerStarter {
       LOGGER.info("Stopping resource manager");
       _helixResourceManager.stop();
 
+      LOGGER.info("Shutting down executor service");
       _executorService.shutdownNow();
+      _executorService.awaitTermination(10L, TimeUnit.SECONDS);
     } catch (final Exception e) {
       LOGGER.error("Caught exception while shutting down", e);
     }
   }
 
-  public boolean isPinotOnlyModeSupported() {
-    return false;
-  }
-
   public MetricsRegistry getMetricsRegistry() {
     return _metricsRegistry;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index 82c4cab..dfa4979 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -20,16 +20,10 @@ package org.apache.pinot.controller.helix.core.util;
 
 import com.google.common.base.Preconditions;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
-import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.HelixControllerMain;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -37,44 +31,35 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.helix.HelixHelper;
 import 
org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
 import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.ENABLE_DELAY_REBALANCE;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.MIN_ACTIVE_REPLICAS;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.REBALANCE_DELAY_MS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
 
 
-/**
- * HelixSetupUtils handles how to create or get a helixCluster in controller.
- *
- *
- */
 public class HelixSetupUtils {
+  private HelixSetupUtils() {
+  }
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixSetupUtils.class);
 
-  public static HelixManager setup(String helixClusterName, String zkPath, 
String helixControllerInstanceId) {
+  public static HelixManager setupHelixController(String helixClusterName, 
String zkPath, String instanceId) {
     setupHelixClusterIfNeeded(helixClusterName, zkPath);
-    return startHelixControllerInStandaloneMode(helixClusterName, zkPath, 
helixControllerInstanceId);
+    return HelixControllerMain
+        .startHelixController(zkPath, helixClusterName, instanceId, 
HelixControllerMain.STANDALONE);
   }
 
-  /**
-   * Set up a brand new Helix cluster if it doesn't exist.
-   */
   private static void setupHelixClusterIfNeeded(String helixClusterName, 
String zkPath) {
     HelixAdmin admin = new ZKHelixAdmin(zkPath);
     if (admin.getClusters().contains(helixClusterName)) {
@@ -90,162 +75,106 @@ public class HelixSetupUtils {
     }
   }
 
-  private static HelixManager startHelixControllerInStandaloneMode(String 
helixClusterName, String zkUrl,
-      String pinotControllerInstanceId) {
-    LOGGER.info("Starting Helix Standalone Controller ... ");
-    return HelixControllerMain
-        .startHelixController(zkUrl, helixClusterName, 
pinotControllerInstanceId, HelixControllerMain.STANDALONE);
-  }
-
-  /**
-   * Customizes existing Helix cluster to run Pinot components.
-   */
   public static void setupPinotCluster(String helixClusterName, String zkPath, 
boolean isUpdateStateModel,
       boolean enableBatchMessageMode) {
-    final HelixAdmin admin = new ZKHelixAdmin(zkPath);
-    Preconditions.checkState(admin.getClusters().contains(helixClusterName),
-        String.format("Helix cluster: %s hasn't been set up", 
helixClusterName));
-
-    // Add segment state model definition if needed
-    addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, 
isUpdateStateModel);
-
-    // Add broker resource if needed
-    createBrokerResourceIfNeeded(helixClusterName, admin, 
enableBatchMessageMode);
-
-    // Add lead controller resource if needed
-    createLeadControllerResourceIfNeeded(helixClusterName, admin, 
enableBatchMessageMode);
-
-    // Init property store if needed
-    initPropertyStoreIfNeeded(helixClusterName, zkPath);
+    HelixZkClient zkClient = null;
+    try {
+      zkClient = SharedZkClientFactory.getInstance().buildZkClient(new 
HelixZkClient.ZkConnectionConfig(zkPath),
+          new HelixZkClient.ZkClientConfig().setZkSerializer(new 
ZNRecordSerializer())
+              
.setConnectInitTimeout(TimeUnit.SECONDS.toMillis(ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC)));
+      zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, 
TimeUnit.SECONDS);
+      HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient);
+      HelixDataAccessor helixDataAccessor =
+          new ZKHelixDataAccessor(helixClusterName, new 
ZkBaseDataAccessor<>(zkClient));
+
+      
Preconditions.checkState(helixAdmin.getClusters().contains(helixClusterName),
+          String.format("Helix cluster: %s hasn't been set up", 
helixClusterName));
+
+      // Add segment state model definition if needed
+      addSegmentStateModelDefinitionIfNeeded(helixClusterName, helixAdmin, 
helixDataAccessor, isUpdateStateModel);
+
+      // Add broker resource if needed
+      createBrokerResourceIfNeeded(helixClusterName, helixAdmin, 
enableBatchMessageMode);
+
+      // Add lead controller resource if needed
+      createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, 
enableBatchMessageMode);
+    } finally {
+      if (zkClient != null) {
+        zkClient.close();
+      }
+    }
   }
 
-  private static void addSegmentStateModelDefinitionIfNeeded(String 
helixClusterName, HelixAdmin admin, String zkPath,
-      boolean isUpdateStateModel) {
-    final String segmentStateModelName =
+  private static void addSegmentStateModelDefinitionIfNeeded(String 
helixClusterName, HelixAdmin helixAdmin,
+      HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) {
+    String segmentStateModelName =
         
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
-    StateModelDefinition stateModelDefinition = 
admin.getStateModelDef(helixClusterName, segmentStateModelName);
-    if (stateModelDefinition == null) {
-      LOGGER.info("Adding state model {} (with CONSUMED state) generated using 
{}", segmentStateModelName,
-          PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
-      admin.addStateModelDef(helixClusterName, segmentStateModelName,
-          
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-    } else if (isUpdateStateModel) {
-      final StateModelDefinition curStateModelDef = 
admin.getStateModelDef(helixClusterName, segmentStateModelName);
-      List<String> states = curStateModelDef.getStatesPriorityList();
-      if 
(states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE))
 {
-        LOGGER.info("State model {} already updated to contain CONSUMING 
state", segmentStateModelName);
+    StateModelDefinition stateModelDefinition = 
helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName);
+    if (stateModelDefinition == null || isUpdateStateModel) {
+      if (stateModelDefinition == null) {
+        LOGGER.info("Adding state model: {} with CONSUMING state", 
segmentStateModelName);
       } else {
-        LOGGER.info("Updating {} to add states for low level consumers", 
segmentStateModelName);
-        StateModelDefinition newStateModelDef =
-            
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
-        ZkClient zkClient = new ZkClient(zkPath);
-        
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC,
 TimeUnit.SECONDS);
-        zkClient.setZkSerializer(new ZNRecordSerializer());
-        HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName, 
new ZkBaseDataAccessor<>(zkClient));
-        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-        accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), 
newStateModelDef);
-        LOGGER.info("Completed updating state model {}", 
segmentStateModelName);
-        zkClient.close();
+        LOGGER.info("Updating state model: {} to contain CONSUMING state", 
segmentStateModelName);
       }
+      helixDataAccessor
+          
.createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
   }
 
-  private static void createBrokerResourceIfNeeded(String helixClusterName, 
HelixAdmin admin,
+  private static void createBrokerResourceIfNeeded(String helixClusterName, 
HelixAdmin helixAdmin,
       boolean enableBatchMessageMode) {
     // Add broker resource online offline state model definition if needed
-    StateModelDefinition brokerResourceStateModelDefinition = 
admin.getStateModelDef(helixClusterName,
+    StateModelDefinition brokerResourceStateModelDefinition = 
helixAdmin.getStateModelDef(helixClusterName,
         
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL);
     if (brokerResourceStateModelDefinition == null) {
       LOGGER.info("Adding state model definition named : {} generated using : 
{}",
           
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
           
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString());
-      admin.addStateModelDef(helixClusterName,
+      helixAdmin.addStateModelDef(helixClusterName,
           
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
           
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
     }
 
     // Create broker resource if needed.
-    IdealState brokerResourceIdealState =
-        admin.getResourceIdealState(helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
-    if (brokerResourceIdealState == null) {
+    if (helixAdmin.getResourceIdealState(helixClusterName, 
BROKER_RESOURCE_INSTANCE) == null) {
       LOGGER.info("Adding empty ideal state for Broker!");
-      HelixHelper
-          .updateResourceConfigsFor(new HashMap<>(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName,
-              admin);
-      IdealState idealState = PinotTableIdealStateBuilder
-          .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, 
enableBatchMessageMode);
-      admin.setResourceIdealState(helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
+      IdealState emptyIdealStateForBrokerResource = PinotTableIdealStateBuilder
+          .buildEmptyIdealStateForBrokerResource(helixAdmin, helixClusterName, 
enableBatchMessageMode);
+      helixAdmin.setResourceIdealState(helixClusterName, 
BROKER_RESOURCE_INSTANCE, emptyIdealStateForBrokerResource);
     }
   }
 
-  private static void createLeadControllerResourceIfNeeded(String 
helixClusterName, HelixAdmin admin,
+  private static void createLeadControllerResourceIfNeeded(String 
helixClusterName, HelixAdmin helixAdmin,
       boolean enableBatchMessageMode) {
-    StateModelDefinition masterSlaveStateModelDefinition =
-        admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name);
-    if (masterSlaveStateModelDefinition == null) {
-      LOGGER.info("Adding state model definition named : {} generated using : 
{}", MasterSlaveSMD.name,
-          MasterSlaveSMD.class.toString());
-      admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name, 
MasterSlaveSMD.build());
-    }
-
-    IdealState leadControllerResourceIdealState =
-        admin.getResourceIdealState(helixClusterName, 
LEAD_CONTROLLER_RESOURCE_NAME);
-    if (leadControllerResourceIdealState == null) {
+    if (helixAdmin.getResourceIdealState(helixClusterName, 
LEAD_CONTROLLER_RESOURCE_NAME) == null) {
       LOGGER.info("Cluster {} doesn't contain {}. Creating one.", 
helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
-      HelixHelper.updateResourceConfigsFor(new HashMap<>(), 
LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin);
-      // FULL-AUTO Master-Slave state model with CrushED reBalance strategy.
-      admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
-          
CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, 
MasterSlaveSMD.name,
-          IdealState.RebalanceMode.FULL_AUTO.toString(), 
CrushEdRebalanceStrategy.class.getName());
 
+      // FULL-AUTO Master-Slave state model with CrushED rebalance strategy.
+      IdealState leadControllerResourceIdealState = new 
IdealState(LEAD_CONTROLLER_RESOURCE_NAME);
+      
leadControllerResourceIdealState.setNumPartitions(NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+      leadControllerResourceIdealState.setReplicas(Integer.toString(0));
+      
leadControllerResourceIdealState.setStateModelDefRef(MasterSlaveSMD.name);
+      
leadControllerResourceIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      
leadControllerResourceIdealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
       // Set instance group tag for lead controller resource.
-      IdealState leadControllerIdealState =
-          admin.getResourceIdealState(helixClusterName, 
LEAD_CONTROLLER_RESOURCE_NAME);
-      
leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
-      leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode);
+      
leadControllerResourceIdealState.setInstanceGroupTag(CONTROLLER_INSTANCE_TYPE);
+      
leadControllerResourceIdealState.setBatchMessageMode(enableBatchMessageMode);
       // The below config guarantees if active number of replicas is no less 
than minimum active replica, there will not be partition movements happened.
       // Set min active replicas to 0 and rebalance delay to 5 minutes so that 
if any master goes offline, Helix controller waits at most 5 minutes and then 
re-calculate the participant assignment.
       // This delay is helpful when periodic tasks are running and we don't 
want them to be re-run too frequently.
       // Plus, if virtual id is applied to controller hosts, swapping hosts 
would be easy as new hosts can use the same virtual id and it takes least 
effort to change the configs.
-      leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
-      leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
-      
leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
-      admin.setResourceIdealState(helixClusterName, 
LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState);
-
+      
leadControllerResourceIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
+      leadControllerResourceIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
+      
leadControllerResourceIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
       // Explicitly disable this resource when creating this new resource.
       // When all the controllers are running the code with the logic to 
handle this resource, it can be enabled for backward compatibility.
       // In the next major release, we can enable this resource by default, so 
that all the controller logic can be separated.
-      admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, 
false);
-
-      LOGGER.info("Re-balance lead controller resource with replicas: {}",
-          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
-      // Set it to 1 so that there's only 1 instance (i.e. master) shown in 
every partitions.
-      admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
-          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
-    }
-  }
+      // To enable the resource:
+      // helixAdmin.enableResource(helixClusterName, 
LEAD_CONTROLLER_RESOURCE_NAME, true);
+      // helixAdmin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, 
LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+      leadControllerResourceIdealState.enable(false);
 
-  private static void initPropertyStoreIfNeeded(String helixClusterName, 
String zkPath) {
-    String propertyStorePath = 
PropertyPathBuilder.propertyStore(helixClusterName);
-    ZkHelixPropertyStore<ZNRecord> propertyStore =
-        new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(), 
propertyStorePath);
-    if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) {
-      propertyStore.create("/CONFIGS", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) {
-      propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) {
-      propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) {
-      propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) {
-      propertyStore.create("/SCHEMAS", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    }
-    if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) {
-      propertyStore.create("/SEGMENTS", new ZNRecord(""), 
AccessOption.PERSISTENT);
+      helixAdmin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, 
leadControllerResourceIdealState);
     }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
index 8f72a59..2382939 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
@@ -30,7 +30,6 @@ import org.testng.annotations.Test;
 
 
 public class ControllerPeriodicTaskStarterTest extends ControllerTest {
-  private MockControllerStarter _mockControllerStarter;
 
   @BeforeClass
   public void setup() {
@@ -51,32 +50,13 @@ public class ControllerPeriodicTaskStarterTest extends 
ControllerTest {
   }
 
   @Override
-  protected void startControllerStarter(ControllerConf config) {
-    _mockControllerStarter = new MockControllerStarter(config);
-    _mockControllerStarter.start();
-    _helixResourceManager = _mockControllerStarter.getHelixResourceManager();
-    _helixManager = _mockControllerStarter.getHelixControllerManager();
+  protected ControllerStarter getControllerStarter(ControllerConf config) {
+    return new MockControllerStarter(config);
   }
 
-  @Override
-  protected void stopControllerStarter() {
-    Assert.assertNotNull(_mockControllerStarter);
-
-    _mockControllerStarter.stop();
-    _mockControllerStarter = null;
-  }
-
-  @Override
-  protected ControllerStarter getControllerStarter() {
-    return _mockControllerStarter;
-  }
-
-  private class MockControllerStarter extends TestOnlyControllerStarter {
-
+  private class MockControllerStarter extends ControllerStarter {
     private static final int NUM_PERIODIC_TASKS = 7;
 
-    private List<PeriodicTask> _controllerPeriodicTasks;
-
     public MockControllerStarter(ControllerConf conf) {
       super(conf);
     }
@@ -90,14 +70,10 @@ public class ControllerPeriodicTaskStarterTest extends 
ControllerTest {
       Assert.assertNotNull(helixResourceManager.getHelixClusterName());
       Assert.assertNotNull(helixResourceManager.getPropertyStore());
 
-      _controllerPeriodicTasks = super.setupControllerPeriodicTasks();
-      Assert.assertNotNull(_controllerPeriodicTasks);
-      Assert.assertEquals(_controllerPeriodicTasks.size(), NUM_PERIODIC_TASKS);
-      return _controllerPeriodicTasks;
-    }
-
-    List<PeriodicTask> getControllerPeriodicTasks() {
-      return _controllerPeriodicTasks;
+      List<PeriodicTask> controllerPeriodicTasks = 
super.setupControllerPeriodicTasks();
+      Assert.assertNotNull(controllerPeriodicTasks);
+      Assert.assertEquals(controllerPeriodicTasks.size(), NUM_PERIODIC_TASKS);
+      return controllerPeriodicTasks;
     }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 9893377..4c5ff81 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix;
 
+import com.google.common.base.Preconditions;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -38,7 +39,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.data.DimensionFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
@@ -55,10 +55,9 @@ import org.testng.Assert;
  * Base class for controller tests.
  */
 public abstract class ControllerTest {
-  public static final String LOCAL_HOST = "localhost";
-
-  private static final int DEFAULT_CONTROLLER_PORT = 18998;
-  private static final String DEFAULT_DATA_DIR =
+  protected static final String LOCAL_HOST = "localhost";
+  protected static final int DEFAULT_CONTROLLER_PORT = 18998;
+  protected static final String DEFAULT_DATA_DIR =
       new File(FileUtils.getTempDirectoryPath(), "test-controller-" + 
System.currentTimeMillis()).getAbsolutePath();
 
   protected int _controllerPort;
@@ -66,7 +65,6 @@ public abstract class ControllerTest {
   protected ControllerRequestURLBuilder _controllerRequestURLBuilder;
   protected String _controllerDataDir;
 
-  protected ZkClient _zkClient;
   protected ControllerStarter _controllerStarter;
   protected PinotHelixResourceManager _helixResourceManager;
   protected HelixManager _helixManager;
@@ -95,58 +93,33 @@ public abstract class ControllerTest {
     }
   }
 
-  public static ControllerConf getDefaultControllerConfiguration() {
+  public ControllerConf getDefaultControllerConfiguration() {
     ControllerConf config = new ControllerConf();
     config.setControllerHost(LOCAL_HOST);
     config.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT));
     config.setDataDir(DEFAULT_DATA_DIR);
     config.setZkStr(ZkStarter.DEFAULT_ZK_STR);
+    config.setHelixClusterName(getHelixClusterName());
 
     return config;
   }
 
-  public class TestOnlyControllerStarter extends ControllerStarter {
-
-    TestOnlyControllerStarter(ControllerConf conf) {
-      super(conf);
-    }
-
-    @Override
-    public boolean isPinotOnlyModeSupported() {
-      return true;
-    }
-  }
-
   protected void startController() {
     startController(getDefaultControllerConfiguration());
   }
 
   protected void startController(ControllerConf config) {
-    startController(config, true);
-  }
-
-  protected void startController(ControllerConf config, boolean deleteCluster) 
{
-    Assert.assertNotNull(config);
-    Assert.assertNull(_controllerStarter);
+    Preconditions.checkState(_controllerStarter == null);
 
     _controllerPort = Integer.valueOf(config.getControllerPort());
     _controllerBaseApiUrl = "http://localhost:"; + _controllerPort;
     _controllerRequestURLBuilder = 
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
     _controllerDataDir = config.getDataDir();
 
-    String helixClusterName = getHelixClusterName();
-    config.setHelixClusterName(helixClusterName);
-
-    String zkStr = config.getZkStr();
-    _zkClient = new ZkClient(zkStr);
-    if (_zkClient.exists("/" + helixClusterName) && deleteCluster) {
-      _zkClient.deleteRecursive("/" + helixClusterName);
-    }
-
     startControllerStarter(config);
 
     // HelixResourceManager is null in Helix only mode, while HelixManager is 
null in Pinot only mode.
-    switch (getControllerStarter().getControllerMode()) {
+    switch (_controllerStarter.getControllerMode()) {
       case DUAL:
       case PINOT_ONLY:
         _helixAdmin = _helixResourceManager.getHelixAdmin();
@@ -160,16 +133,19 @@ public abstract class ControllerTest {
   }
 
   protected void startControllerStarter(ControllerConf config) {
-    _controllerStarter = new TestOnlyControllerStarter(config);
+    _controllerStarter = getControllerStarter(config);
     _controllerStarter.start();
     _helixResourceManager = _controllerStarter.getHelixResourceManager();
     _helixManager = _controllerStarter.getHelixControllerManager();
   }
 
+  protected ControllerStarter getControllerStarter(ControllerConf config) {
+    return new ControllerStarter(config);
+  }
+
   protected void stopController() {
     stopControllerStarter();
     FileUtils.deleteQuietly(new File(_controllerDataDir));
-    _zkClient.close();
   }
 
   protected void stopControllerStarter() {
@@ -179,10 +155,6 @@ public abstract class ControllerTest {
     _controllerStarter = null;
   }
 
-  protected ControllerStarter getControllerStarter() {
-    return _controllerStarter;
-  }
-
   protected Schema createDummySchema(String tableName) {
     Schema schema = new Schema();
     schema.setSchemaName(tableName);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index d91e612..67ebfe5 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pinot.controller.helix;
 
+import java.util.Arrays;
 import java.util.Map;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.ExternalView;
-import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerStarter;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -32,183 +32,186 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT;
+
 
 public class PinotControllerModeTest extends ControllerTest {
   private static long TIMEOUT_IN_MS = 10_000L;
-  private ControllerConf config;
-  private int controllerPortOffset;
 
   @BeforeClass
   public void setUp() {
     startZk();
-    config = getDefaultControllerConfiguration();
-    controllerPortOffset = 0;
   }
 
   @Test
-  public void testHelixOnlyController()
-      throws Exception {
-    config.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
-    
config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
-
-    startController(config);
-    TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), 
TIMEOUT_IN_MS,
-        "Failed to start " + config.getControllerMode() + " controller in " + 
TIMEOUT_IN_MS + "ms.");
-
-    Assert.assertEquals(_controllerStarter.getControllerMode(), 
ControllerConf.ControllerMode.HELIX_ONLY);
-
-    stopController();
-    _controllerStarter = null;
+  public void testHelixOnlyController() {
+    ControllerConf helixOnlyControllerConfig = 
getDefaultControllerConfiguration();
+    
helixOnlyControllerConfig.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
+    ControllerStarter helixOnlyController = 
getControllerStarter(helixOnlyControllerConfig);
+    helixOnlyController.start();
+    TestUtils.waitForCondition(aVoid -> 
helixOnlyController.getHelixControllerManager().isConnected(), TIMEOUT_IN_MS,
+        "Failed to start the Helix-only controller");
+
+    helixOnlyController.stop();
   }
 
   @Test
-  public void testDualModeController()
-      throws Exception {
-    config.setControllerMode(ControllerConf.ControllerMode.DUAL);
-    
config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
-
+  public void testDualModeController() {
     // Helix cluster will be set up when starting the first controller.
-    startController(config);
-    TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), 
TIMEOUT_IN_MS,
-        "Failed to start " + config.getControllerMode() + " controller in " + 
TIMEOUT_IN_MS + "ms.");
-    Assert.assertEquals(_controllerStarter.getControllerMode(), 
ControllerConf.ControllerMode.DUAL);
-
-    // Enable the lead controller resource.
-    _helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+    ControllerConf firstDualModeControllerConfig = 
getDefaultControllerConfiguration();
+    
firstDualModeControllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
+    ControllerStarter firstDualModeController = 
getControllerStarter(firstDualModeControllerConfig);
+    firstDualModeController.start();
+    TestUtils
+        .waitForCondition(aVoid -> 
firstDualModeController.getHelixControllerManager().isConnected(), 
TIMEOUT_IN_MS,
+            "Failed to start the first dual-mode controller");
 
     // Starting a second dual-mode controller. Helix cluster has already been 
set up.
-    ControllerConf controllerConfig = getDefaultControllerConfiguration();
-    controllerConfig.setHelixClusterName(getHelixClusterName());
-    controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
-    controllerConfig.setControllerPort(
-        Integer.toString(Integer.parseInt(this.config.getControllerPort()) + 
controllerPortOffset++));
-
-    ControllerStarter secondDualModeController = new 
TestOnlyControllerStarter(controllerConfig);
+    ControllerConf secondDualModeControllerConfig = 
getDefaultControllerConfiguration();
+    
secondDualModeControllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
+    
secondDualModeControllerConfig.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT
 + 1));
+    ControllerStarter secondDualModeController = 
getControllerStarter(secondDualModeControllerConfig);
     secondDualModeController.start();
     TestUtils
         .waitForCondition(aVoid -> 
secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(),
-            TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " 
controller in " + TIMEOUT_IN_MS + "ms.");
-    Assert.assertEquals(secondDualModeController.getControllerMode(), 
ControllerConf.ControllerMode.DUAL);
+            TIMEOUT_IN_MS, "Failed to start the second dual-mode controller");
 
     secondDualModeController.stop();
-    stopController();
-    _controllerStarter = null;
+    firstDualModeController.stop();
   }
 
   // TODO: enable it after removing ControllerLeadershipManager which requires 
both CONTROLLER and PARTICIPANT
   //       HelixManager
   @Test(enabled = false)
-  public void testPinotOnlyController()
-      throws Exception {
-    config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
-    
config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
+  public void testPinotOnlyController() {
+    ControllerConf firstPinotOnlyControllerConfig = 
getDefaultControllerConfiguration();
+    
firstPinotOnlyControllerConfig.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+    ControllerStarter firstPinotOnlyController = 
getControllerStarter(firstPinotOnlyControllerConfig);
 
     // Starting pinot only controller before starting helix controller should 
fail.
     try {
-      startController(config);
-      Assert.fail("Starting pinot only controller should fail!");
-    } catch (RuntimeException e) {
-      _controllerStarter = null;
+      firstPinotOnlyController.start();
+      Assert.fail("Starting Pinot-only controller without Helix controller 
should fail");
+    } catch (Exception e) {
+      // Expected
     }
 
     // Starting a helix controller.
-    ControllerConf config2 = getDefaultControllerConfiguration();
-    config2.setHelixClusterName(getHelixClusterName());
-    config2.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
-    
config2.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
-    ControllerStarter helixControllerStarter = new ControllerStarter(config2);
-    helixControllerStarter.start();
-    HelixManager helixControllerManager = 
helixControllerStarter.getHelixControllerManager();
+    ControllerConf helixOnlyControllerConfig = 
getDefaultControllerConfiguration();
+    
helixOnlyControllerConfig.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
+    
helixOnlyControllerConfig.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT
 + 1));
+    ControllerStarter helixOnlyController = new 
ControllerStarter(helixOnlyControllerConfig);
+    helixOnlyController.start();
+    HelixManager helixControllerManager = 
helixOnlyController.getHelixControllerManager();
     HelixAdmin helixAdmin = helixControllerManager.getClusterManagmentTool();
     TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), 
TIMEOUT_IN_MS,
-        "Failed to start " + config2.getControllerMode() + " controller in " + 
TIMEOUT_IN_MS + "ms.");
-
-    // Enable the lead controller resource.
-    helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+        "Failed to start the Helix-only controller");
 
     // Starting a pinot only controller.
-    ControllerConf config3 = getDefaultControllerConfiguration();
-    config3.setHelixClusterName(getHelixClusterName());
-    config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
-    
config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
-
-    ControllerStarter firstPinotOnlyController = new 
TestOnlyControllerStarter(config3);
     firstPinotOnlyController.start();
-    PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager =
-        firstPinotOnlyController.getHelixResourceManager();
+    PinotHelixResourceManager helixResourceManager = 
firstPinotOnlyController.getHelixResourceManager();
+    TestUtils.waitForCondition(aVoid -> 
helixResourceManager.getHelixZkManager().isConnected(), TIMEOUT_IN_MS,
+        "Failed to start the first Pinot-only controller");
 
-    TestUtils.waitForCondition(aVoid -> 
firstPinotOnlyPinotHelixResourceManager.getHelixZkManager().isConnected(),
-        TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " 
controller in " + TIMEOUT_IN_MS + "ms.");
-    Assert.assertEquals(firstPinotOnlyController.getControllerMode(), 
ControllerConf.ControllerMode.PINOT_ONLY);
+    // Enable the lead controller resource.
+    helixAdmin.enableResource(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME, true);
+    helixAdmin.rebalance(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, 
LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView =
+          helixAdmin.getResourceExternalView(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
+        if (stateMap.size() != 1 || stateMap.values().contains("MASTER")) {
+          return false;
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to choose the only participant as MASTER");
 
     // Start a second Pinot only controller.
-    ControllerConf config4 = getDefaultControllerConfiguration();
-    config4.setHelixClusterName(getHelixClusterName());
-    config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
-    
config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
-
-    ControllerStarter secondControllerStarter = new 
TestOnlyControllerStarter(config4);
-    secondControllerStarter.start();
-    // Two controller instances assigned to cluster.
-    TestUtils
-        .waitForCondition(aVoid -> 
firstPinotOnlyPinotHelixResourceManager.getAllInstances().size() == 2, 
TIMEOUT_IN_MS,
-            "Failed to start the 2nd pinot only controller in " + 
TIMEOUT_IN_MS + "ms.");
+    ControllerConf secondPinotOnlyControllerConfig = 
getDefaultControllerConfiguration();
+    
secondPinotOnlyControllerConfig.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+    
secondPinotOnlyControllerConfig.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT
 + 2));
+    ControllerStarter secondPinotOnlyController = 
getControllerStarter(secondPinotOnlyControllerConfig);
+    secondPinotOnlyController.start();
+    TestUtils.waitForCondition(
+        aVoid -> 
secondPinotOnlyController.getHelixResourceManager().getHelixZkManager().isConnected(),
 TIMEOUT_IN_MS,
+        "Failed to start the second Pinot-only controller");
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView =
+          helixAdmin.getResourceExternalView(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
+        if (stateMap.size() != 2 || 
stateMap.values().containsAll(Arrays.asList("MASTER", "SLAVE"))) {
+          return false;
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to choose two participants as MASTER and SLAVE");
 
     // Disable lead controller resource, all the participants are in offline 
state (from slave state).
-    helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, false);
-
+    helixAdmin.enableResource(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME, false);
     TestUtils.waitForCondition(aVoid -> {
-      ExternalView leadControllerResourceExternalView = 
firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
-          .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      ExternalView leadControllerResourceExternalView =
+          helixAdmin.getResourceExternalView(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME);
       for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
         Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
-        for (Map.Entry<String, String> entry : stateMap.entrySet()) {
-          if (!"OFFLINE".equals(entry.getValue())) {
+        if (stateMap.size() != 2) {
+          return false;
+        }
+        for (String value : stateMap.values()) {
+          if (!value.equals("OFFLINE")) {
             return false;
           }
         }
       }
       return true;
-    }, TIMEOUT_IN_MS, "Failed to mark all the participants offline in " + 
TIMEOUT_IN_MS + "ms.");
+    }, TIMEOUT_IN_MS, "Failed to turn all the participants OFFLINE");
 
-    // Re-enable lead controller resource, all the participants are in healthy 
state (either master or slave).
-    helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+    // Re-enable lead controller resource, all the participants are in healthy 
state (either MASTER or SLAVE).
+    helixAdmin.enableResource(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME, true);
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView =
+          helixAdmin.getResourceExternalView(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
+        if (stateMap.size() != 2 || 
stateMap.values().containsAll(Arrays.asList("MASTER", "SLAVE"))) {
+          return false;
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to choose two participants as MASTER and SLAVE 
from OFFLINE");
 
     // Shutdown one controller, it will be removed from external view of lead 
controller resource.
-    secondControllerStarter.stop();
-
+    secondPinotOnlyController.stop();
     TestUtils.waitForCondition(aVoid -> {
-      ExternalView leadControllerResourceExternalView = 
firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
-          .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      ExternalView leadControllerResourceExternalView =
+          helixAdmin.getResourceExternalView(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME);
       for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
         Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
-        // Only 1 participant left in each partition, which will become the 
master.
-        for (Map.Entry<String, String> entry : stateMap.entrySet()) {
-          if (!"MASTER".equals(entry.getValue())) {
-            return false;
-          }
+        if (stateMap.size() != 1 || stateMap.values().contains("MASTER")) {
+          return false;
         }
       }
       return true;
-    }, TIMEOUT_IN_MS, "Failed to mark all the participants MASTER in " + 
TIMEOUT_IN_MS + "ms.");
+    }, TIMEOUT_IN_MS, "Failed to drop the first disconnected participant");
 
     // Shutdown the only one controller left, the partition map should be 
empty.
     firstPinotOnlyController.stop();
     TestUtils.waitForCondition(aVoid -> {
-      ExternalView leadControllerResourceExternalView = helixAdmin
-          .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      ExternalView leadControllerResourceExternalView =
+          helixAdmin.getResourceExternalView(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME);
       for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
         Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
-        // There's no participant in all the partitions.
         if (!stateMap.isEmpty()) {
           return false;
         }
       }
       return true;
-    }, TIMEOUT_IN_MS, "Failed to have all the partitions empty in " + 
TIMEOUT_IN_MS + "ms.");
+    }, TIMEOUT_IN_MS, "Failed to drop the second disconnected participant");
 
-    _controllerStarter = null;
-    helixControllerStarter.stop();
+    helixOnlyController.stop();
   }
 
   @AfterClass
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index a850ac4..21226a4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -62,8 +62,11 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
-import static 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager.*;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE;
+import static 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager.InvalidTableConfigException;
 
 
 public class PinotHelixResourceManagerTest extends ControllerTest {
@@ -82,28 +85,33 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   private final String _helixClusterName = getHelixClusterName();
 
   @BeforeClass
-  public void setUp() throws Exception {
+  public void setUp()
+      throws Exception {
     startZk();
     ControllerConf config = getDefaultControllerConfiguration();
     config.setTenantIsolationEnabled(false);
     startController(config);
 
-    
ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
-        ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false);
-    
ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName,
 ZkStarter.DEFAULT_ZK_STR,
-        NUM_INSTANCES, false, BASE_SERVER_ADMIN_PORT);
+    ControllerRequestBuilderUtil
+        .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES,
+            false);
+    ControllerRequestBuilderUtil
+        .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false,
+            BASE_SERVER_ADMIN_PORT);
 
     // Create server tenant on all Servers
-    Tenant serverTenant = new 
Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER)
-        .setOfflineInstances(NUM_INSTANCES)
-        .build();
+    Tenant serverTenant =
+        new 
Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER).setOfflineInstances(NUM_INSTANCES)
+            .build();
     _helixResourceManager.createServerTenant(serverTenant);
 
     _helixAdmin.enableResource(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME, true);
+    _helixAdmin.rebalance(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME, LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
   }
 
   @Test
-  public void testGetInstanceEndpoints() throws InvalidConfigException {
+  public void testGetInstanceEndpoints()
+      throws InvalidConfigException {
     Set<String> servers = 
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     BiMap<String, String> endpoints = 
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
     for (int i = 0; i < NUM_INSTANCES; i++) {
@@ -112,7 +120,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   }
 
   @Test
-  public void testGetInstanceConfigs() throws Exception {
+  public void testGetInstanceConfigs()
+      throws Exception {
     Set<String> servers = 
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     for (String server : servers) {
       InstanceConfig cachedInstanceConfig = 
_helixResourceManager.getHelixInstanceConfig(server);
@@ -129,7 +138,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     zkClient.close();
   }
 
-  private void modifyExistingInstanceConfig(ZkClient zkClient) throws 
InterruptedException {
+  private void modifyExistingInstanceConfig(ZkClient zkClient)
+      throws InterruptedException {
     String instanceName = "Server_localhost_" + new 
Random().nextInt(NUM_INSTANCES);
     String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
     Assert.assertTrue(zkClient.exists(instanceConfigPath));
@@ -160,7 +170,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     zkClient.writeData(instanceConfigPath, znRecord);
   }
 
-  private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws 
Exception {
+  private void addAndRemoveNewInstanceConfig(ZkClient zkClient)
+      throws Exception {
     int biggerRandomNumber = NUM_INSTANCES + new 
Random().nextInt(NUM_INSTANCES);
     String instanceName = "Server_localhost_" + 
String.valueOf(biggerRandomNumber);
     String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
@@ -193,18 +204,16 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   }
 
   @Test
-  public void testRebuildBrokerResourceFromHelixTags() throws Exception {
+  public void testRebuildBrokerResourceFromHelixTags()
+      throws Exception {
     // Create broker tenant on 3 Brokers
     Tenant brokerTenant =
         new 
Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(3).build();
     _helixResourceManager.createBrokerTenant(brokerTenant);
 
     // Create the table
-    TableConfig tableConfig = new 
TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setNumReplicas(3)
-        .setBrokerTenant(BROKER_TENANT_NAME)
-        .setServerTenant(SERVER_TENANT_NAME)
-        .build();
+    TableConfig tableConfig = new 
TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
+        
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
     _helixResourceManager.addTable(tableConfig);
 
     // Check that the BrokerResource ideal state has 3 Brokers assigned to the 
table
@@ -214,8 +223,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
 
     // Untag all Brokers assigned to broker tenant
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
-      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
-          TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+      _helixAdmin
+          .removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
       _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
 
@@ -238,7 +247,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   }
 
   @Test
-  public void testRetrieveMetadata() throws Exception {
+  public void testRetrieveMetadata() {
     String segmentName = "testSegment";
 
     // Test retrieving OFFLINE segment ZK metadata
@@ -463,8 +472,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   }
 
   @Test
-  public void testLeadControllerResource()
-      throws Exception {
+  public void testLeadControllerResource() {
     IdealState leadControllerResourceIdealState = 
_helixResourceManager.getHelixAdmin()
         .getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
     Assert.assertTrue(leadControllerResourceIdealState.isValid());
@@ -476,25 +484,23 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     Assert.assertEquals(leadControllerResourceIdealState.getReplicas(),
         Integer.toString(LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT));
     Assert.assertEquals(leadControllerResourceIdealState.getRebalanceMode(), 
IdealState.RebalanceMode.FULL_AUTO);
-    Assert.assertTrue(leadControllerResourceIdealState.getInstanceSet(
-        
leadControllerResourceIdealState.getPartitionSet().iterator().next()).isEmpty());
-
-    TestUtils
-        .waitForCondition(aVoid -> {
-              ExternalView leadControllerResourceExternalView = 
_helixResourceManager.getHelixAdmin()
-                  .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
-              for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
-                Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
-                Map.Entry<String, String> entry = 
stateMap.entrySet().iterator().next();
-                boolean result = (PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + 
"_" + _controllerPort).equals(entry.getKey());
-                result &= "MASTER".equals(entry.getValue());
-                if (!result) {
-                  return false;
-                }
-              }
-              return true;
-            },
-            TIMEOUT_IN_MS, "Failed to assign controller hosts to lead 
controller resource in " + TIMEOUT_IN_MS + " ms.");
+    Assert.assertTrue(leadControllerResourceIdealState
+        
.getInstanceSet(leadControllerResourceIdealState.getPartitionSet().iterator().next()).isEmpty());
+
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView = 
_helixResourceManager.getHelixAdmin()
+          .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
+        Map.Entry<String, String> entry = 
stateMap.entrySet().iterator().next();
+        boolean result = (PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" + 
_controllerPort).equals(entry.getKey());
+        result &= "MASTER".equals(entry.getValue());
+        if (!result) {
+          return false;
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to assign controller hosts to lead controller 
resource in " + TIMEOUT_IN_MS + " ms.");
   }
 
   @Test
@@ -536,9 +542,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
             + port + "}{}{TAG_LIST=[controller]}"));
       }
       clusterDataCache.setInstanceConfigMap(instanceConfigMap);
-      ZNRecord znRecord =
-          crushEdRebalanceStrategy.computePartitionAssignment(instanceNames, 
instanceNames, new HashMap<>(0),
-              clusterDataCache);
+      ZNRecord znRecord = crushEdRebalanceStrategy
+          .computePartitionAssignment(instanceNames, instanceNames, new 
HashMap<>(0), clusterDataCache);
 
       Assert.assertNotNull(znRecord);
       Map<String, List<String>> listFields = znRecord.getListFields();
@@ -551,8 +556,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
         if 
(!instanceToMasterAssignmentCountMap.containsKey(assignments.get(0))) {
           instanceToMasterAssignmentCountMap.put(assignments.get(0), 1);
         } else {
-          instanceToMasterAssignmentCountMap.put(assignments.get(0),
-              instanceToMasterAssignmentCountMap.get(assignments.get(0)) + 1);
+          instanceToMasterAssignmentCountMap
+              .put(assignments.get(0), 
instanceToMasterAssignmentCountMap.get(assignments.get(0)) + 1);
         }
         maxCount = 
Math.max(instanceToMasterAssignmentCountMap.get(assignments.get(0)), maxCount);
       }
@@ -568,8 +573,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   public void cleanUpBrokerTags() {
     // Untag all Brokers for other tests
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
-      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
-          TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+      _helixAdmin
+          .removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
       _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java 
b/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
index e1fdb14..176c120 100644
--- a/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java
@@ -140,35 +140,32 @@ public class TestUtils {
    * @param timeoutMs Timeout in milliseconds
    * @param errorMessage Error message if condition is not met before timed out
    */
-  public static void waitForCondition(@Nonnull Function<Void, Boolean> 
condition, long checkIntervalMs, long timeoutMs,
-      @Nullable String errorMessage)
-      throws Exception {
+  public static void waitForCondition(Function<Void, Boolean> condition, long 
checkIntervalMs, long timeoutMs,
+      @Nullable String errorMessage) {
     long endTime = System.currentTimeMillis() + timeoutMs;
-    try {
-      while (System.currentTimeMillis() < endTime) {
-        Boolean isConditionMet = condition.apply(null);
-        if ((isConditionMet != null) && isConditionMet) {
+    while (System.currentTimeMillis() < endTime) {
+      try {
+        if (Boolean.TRUE.equals(condition.apply(null))) {
           return;
         }
         Thread.sleep(checkIntervalMs);
+      } catch (Exception e) {
+        if (errorMessage != null) {
+          Assert.fail("Caught exception while checking the condition, error 
message: " + errorMessage, e);
+        } else {
+          Assert.fail("Caught exception while checking the condition", e);
+        }
       }
-      if (errorMessage != null) {
-        Assert.fail("Failed to meet condition in " + timeoutMs + "ms, error 
message: " + errorMessage);
-      } else {
-        Assert.fail("Failed to meet condition in " + timeoutMs + "ms");
-      }
-    } catch (Exception e) {
-      if (errorMessage != null) {
-        Assert.fail("Caught exception while checking the condition, error 
message: " + errorMessage, e);
-      } else {
-        Assert.fail("Caught exception while checking the condition", e);
-      }
+    }
+    if (errorMessage != null) {
+      Assert.fail("Failed to meet condition in " + timeoutMs + "ms, error 
message: " + errorMessage);
+    } else {
+      Assert.fail("Failed to meet condition in " + timeoutMs + "ms");
     }
   }
 
-  public static void waitForCondition(@Nonnull Function<Void, Boolean> 
condition, long timeoutMs,
-      @Nullable String errorMessage)
-      throws Exception {
+  public static void waitForCondition(Function<Void, Boolean> condition, long 
timeoutMs,
+      @Nullable String errorMessage) {
     waitForCondition(condition, 1000L, timeoutMs, errorMessage);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to