This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9316610  Clean up test utils, disconnect fake instances (#4476)
9316610 is described below

commit 93166101f7d1d97c6042f519c2b019106361268b
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Jul 29 18:50:18 2019 -0700

    Clean up test utils, disconnect fake instances (#4476)
    
    - Remove redundant util classes
    - Move controller test related methods into ControllerTest for easier usage
    - Disconnect fake instances created in the test
    - Update state model to include the missing transitions
    - Fix the flakiness in ValidationManagerTest
    - No change for the test logic
---
 .../broker/broker/HelixBrokerStarterTest.java      |   9 +-
 .../helix/ControllerRequestBuilderUtil.java        | 152 -----------
 .../EmptyBrokerOnlineOfflineStateModelFactory.java |  70 -----
 ...EmptySegmentOnlineOfflineStateModelFactory.java |  81 ------
 .../pinot/controller/api/PinotFileUploadTest.java  |  39 ++-
 .../api/PinotSegmentRestletResourceTest.java       |  29 +--
 .../api/PinotTableRestletResourceTest.java         |  11 +-
 .../api/PinotTenantRestletResourceTest.java        |  12 +-
 .../pinot/controller/api/TableViewsTest.java       |  12 +-
 .../helix/ControllerInstanceToggleTest.java        |  33 ++-
 .../controller/helix/ControllerSentinelTestV2.java |  45 ++--
 .../controller/helix/ControllerTenantTest.java     | 111 ++++----
 .../pinot/controller/helix/ControllerTest.java     | 281 +++++++++++++++++++--
 .../pinot/controller/helix/HelixHelperTest.java    |   6 +-
 .../controller/helix/PinotResourceManagerTest.java |  43 ++--
 .../helix/core/PinotHelixResourceManagerTest.java  |  57 ++---
 .../ReplicaGroupRebalanceStrategyTest.java         |  33 +--
 .../sharding/SegmentAssignmentStrategyTest.java    | 133 ++++------
 .../validation/ValidationManagerTest.java          |  49 ++--
 .../tests/BaseClusterIntegrationTest.java          |  20 +-
 .../tests/BaseClusterIntegrationTestSet.java       |  17 +-
 .../pinot/integration/tests/ClusterTest.java       |  22 +-
 .../ControllerPeriodicTasksIntegrationTests.java   |  85 ++++---
 .../tests/OfflineClusterIntegrationTest.java       |  18 +-
 .../tests/SegmentCompletionIntegrationTest.java    |  10 +-
 .../server/starter/helix/HelixServerStarter.java   |   6 +-
 26 files changed, 605 insertions(+), 779 deletions(-)

diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 8ecff02..df53801 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -41,7 +41,6 @@ import org.apache.pinot.common.utils.CommonConstants.Broker;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.util.TestUtils;
@@ -77,11 +76,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
     _brokerStarter = new HelixBrokerStarter(brokerConf, getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR);
     _brokerStarter.start();
 
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, NUM_BROKERS - 1,
-            true);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, NUM_SERVERS, true);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKERS - 1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true);
 
     Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
         .addTime(TIME_COLUMN_NAME, TimeUnit.DAYS, 
FieldSpec.DataType.INT).build();
@@ -207,6 +203,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
 
   @AfterClass
   public void tearDown() {
+    stopFakeInstances();
     _brokerStarter.shutdown();
     stopController();
     stopZk();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestBuilderUtil.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestBuilderUtil.java
deleted file mode 100644
index bedc11a..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestBuilderUtil.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.pinot.common.config.TableNameBuilder;
-import org.apache.pinot.common.config.TagNameUtils;
-import org.apache.pinot.common.config.Tenant;
-import org.apache.pinot.common.config.Tenant.TenantBuilder;
-import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.TenantRole;
-
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
-import static 
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
-
-
-public class ControllerRequestBuilderUtil {
-  private ControllerRequestBuilderUtil() {
-  }
-
-  public static void addFakeBrokerInstancesToAutoJoinHelixCluster(String 
helixClusterName, String zkServer,
-      int numInstances)
-      throws Exception {
-    addFakeBrokerInstancesToAutoJoinHelixCluster(helixClusterName, zkServer, 
numInstances, false);
-  }
-
-  public static void addFakeBrokerInstancesToAutoJoinHelixCluster(String 
helixClusterName, String zkServer,
-      int numInstances, boolean isSingleTenant)
-      throws Exception {
-    for (int i = 0; i < numInstances; ++i) {
-      final String brokerId = "Broker_localhost_" + i;
-      final HelixManager helixZkManager =
-          HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, 
InstanceType.PARTICIPANT, zkServer);
-      final StateMachineEngine stateMachineEngine = 
helixZkManager.getStateMachineEngine();
-      final StateModelFactory<?> stateModelFactory = new 
EmptyBrokerOnlineOfflineStateModelFactory();
-      stateMachineEngine
-          
.registerStateModelFactory(EmptyBrokerOnlineOfflineStateModelFactory.getStateModelDef(),
 stateModelFactory);
-      helixZkManager.connect();
-      if (isSingleTenant) {
-        
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName, 
brokerId,
-            
TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
-      } else {
-        
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName, 
brokerId, UNTAGGED_BROKER_INSTANCE);
-      }
-    }
-  }
-
-  public static void addFakeDataInstancesToAutoJoinHelixCluster(String 
helixClusterName, String zkServer,
-      int numInstances)
-      throws Exception {
-    addFakeDataInstancesToAutoJoinHelixCluster(helixClusterName, zkServer, 
numInstances, false,
-        CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
-  }
-
-  public static void addFakeDataInstancesToAutoJoinHelixCluster(String 
helixClusterName, String zkServer,
-      int numInstances, boolean isSingleTenant)
-      throws Exception {
-    addFakeDataInstancesToAutoJoinHelixCluster(helixClusterName, zkServer, 
numInstances, isSingleTenant,
-        CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
-  }
-
-  public static void addFakeDataInstancesToAutoJoinHelixCluster(String 
helixClusterName, String zkServer,
-      int numInstances, boolean isSingleTenant, int adminPort)
-      throws Exception {
-
-    for (int i = 0; i < numInstances; ++i) {
-      final String instanceId = "Server_localhost_" + i;
-      addFakeDataInstanceToAutoJoinHelixCluster(helixClusterName, zkServer, 
instanceId, isSingleTenant, adminPort + i);
-    }
-  }
-
-  public static void addFakeDataInstanceToAutoJoinHelixCluster(String 
helixClusterName, String zkServer,
-      String instanceId)
-      throws Exception {
-    addFakeDataInstanceToAutoJoinHelixCluster(helixClusterName, zkServer, 
instanceId, false,
-        CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
-  }
-
-  public static void addFakeDataInstanceToAutoJoinHelixCluster(String 
helixClusterName, String zkServer,
-      String instanceId, boolean isSingleTenant)
-      throws Exception {
-    addFakeDataInstanceToAutoJoinHelixCluster(helixClusterName, zkServer, 
instanceId, isSingleTenant,
-        CommonConstants.Server.DEFAULT_ADMIN_API_PORT);
-  }
-
-  public static void addFakeDataInstanceToAutoJoinHelixCluster(String 
helixClusterName, String zkServer,
-      String instanceId, boolean isSingleTenant, int adminPort)
-      throws Exception {
-    final HelixManager helixZkManager =
-        HelixManagerFactory.getZKHelixManager(helixClusterName, instanceId, 
InstanceType.PARTICIPANT, zkServer);
-    final StateMachineEngine stateMachineEngine = 
helixZkManager.getStateMachineEngine();
-    final StateModelFactory<?> stateModelFactory = new 
EmptySegmentOnlineOfflineStateModelFactory();
-    stateMachineEngine
-        
.registerStateModelFactory(EmptySegmentOnlineOfflineStateModelFactory.getStateModelDef(),
 stateModelFactory);
-    helixZkManager.connect();
-    if (isSingleTenant) {
-      
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName, 
instanceId,
-          
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
-      
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName, 
instanceId,
-          
TableNameBuilder.REALTIME.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
-    } else {
-      
helixZkManager.getClusterManagmentTool().addInstanceTag(helixClusterName, 
instanceId, UNTAGGED_SERVER_INSTANCE);
-    }
-    HelixConfigScope scope =
-        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, 
helixClusterName)
-            .forParticipant(instanceId).build();
-    Map<String, String> props = new HashMap<>();
-    props.put(CommonConstants.Helix.Instance.ADMIN_PORT_KEY, 
String.valueOf(adminPort));
-    helixZkManager.getClusterManagmentTool().setConfig(scope, props);
-  }
-
-  public static String buildBrokerTenantCreateRequestJSON(String tenantName, 
int numberOfInstances)
-      throws JsonProcessingException {
-    Tenant tenant = new 
TenantBuilder(tenantName).setRole(TenantRole.BROKER).setTotalInstances(numberOfInstances)
-        .setOfflineInstances(0).setRealtimeInstances(0).build();
-    return JsonUtils.objectToString(tenant);
-  }
-
-  public static String buildServerTenantCreateRequestJSON(String tenantName, 
int numberOfInstances,
-      int offlineInstances, int realtimeInstances)
-      throws JsonProcessingException {
-    Tenant tenant = new 
TenantBuilder(tenantName).setRole(TenantRole.SERVER).setTotalInstances(numberOfInstances)
-        
.setOfflineInstances(offlineInstances).setRealtimeInstances(realtimeInstances).build();
-    return JsonUtils.objectToString(tenant);
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptyBrokerOnlineOfflineStateModelFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptyBrokerOnlineOfflineStateModelFactory.java
deleted file mode 100644
index b7ae591..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptyBrokerOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class EmptyBrokerOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
-
-  @Override
-  public StateModel createNewStateModel(String partitionName) {
-    final EmptyBrokerOnlineOfflineStateModel SegmentOnlineOfflineStateModel = 
new EmptyBrokerOnlineOfflineStateModel();
-    return SegmentOnlineOfflineStateModel;
-  }
-
-  public EmptyBrokerOnlineOfflineStateModelFactory() {
-  }
-
-  public static String getStateModelDef() {
-    return "BrokerResourceOnlineOfflineStateModel";
-  }
-
-  @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = 
"OFFLINE")
-  public static class EmptyBrokerOnlineOfflineStateModel extends StateModel {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmptyBrokerOnlineOfflineStateModel.class);
-
-    @Transition(from = "OFFLINE", to = "ONLINE")
-    public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
-      
LOGGER.info("EmptyBrokerOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " 
+ message);
-    }
-
-    @Transition(from = "ONLINE", to = "OFFLINE")
-    public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
-      
LOGGER.info("EmptyBrokerOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " 
+ message);
-    }
-
-    @Transition(from = "OFFLINE", to = "DROPPED")
-    public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
-      
LOGGER.info("EmptyBrokerOnlineOfflineStateModel.onBecomeDroppedFromOffline() : 
" + message);
-    }
-
-    @Transition(from = "ONLINE", to = "DROPPED")
-    public void onBecomeDroppedFromOnline(Message message, NotificationContext 
context) {
-      
LOGGER.info("EmptyBrokerOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " 
+ message);
-    }
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptySegmentOnlineOfflineStateModelFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptySegmentOnlineOfflineStateModelFactory.java
deleted file mode 100644
index 48b8f05..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/EmptySegmentOnlineOfflineStateModelFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.participant.statemachine.StateModelInfo;
-import org.apache.helix.participant.statemachine.Transition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class EmptySegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
-
-  @Override
-  public StateModel createNewStateModel(String partitionName) {
-    final EmptySegmentOnlineOfflineStateModel SegmentOnlineOfflineStateModel =
-        new EmptySegmentOnlineOfflineStateModel();
-    return SegmentOnlineOfflineStateModel;
-  }
-
-  public EmptySegmentOnlineOfflineStateModelFactory() {
-  }
-
-  public static String getStateModelDef() {
-    return "SegmentOnlineOfflineStateModel";
-  }
-
-  @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = 
"OFFLINE")
-  public static class EmptySegmentOnlineOfflineStateModel extends StateModel {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmptySegmentOnlineOfflineStateModel.class);
-
-    @Transition(from = "OFFLINE", to = "ONLINE")
-    public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
-      
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : 
" + message);
-    }
-
-    @Transition(from = "OFFLINE", to = "CONSUMING")
-    public void onBecomeConsumingFromOffline(Message message, 
NotificationContext context) {
-      
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() 
: " + message);
-    }
-
-    @Transition(from = "ONLINE", to = "OFFLINE")
-    public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
-      
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : 
" + message);
-    }
-
-    @Transition(from = "OFFLINE", to = "DROPPED")
-    public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
-      
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : 
" + message);
-    }
-
-    @Transition(from = "ONLINE", to = "DROPPED")
-    public void onBecomeDroppedFromOnline(Message message, NotificationContext 
context) {
-      
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : 
" + message);
-    }
-
-    @Transition(from = "CONSUMING", to = "DROPPED")
-    public void onBecomeDroppedFromConsuming(Message message, 
NotificationContext context) {
-      
LOGGER.info("EmptySegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() 
: " + message);
-    }
-  }
-}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java
index 6d1ba67..d0c7511 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotFileUploadTest.java
@@ -25,8 +25,6 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -41,6 +39,20 @@ import org.testng.annotations.Test;
 public class PinotFileUploadTest extends ControllerTest {
   private static final String TABLE_NAME = "testTable";
 
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    startZk();
+    startController();
+    addFakeBrokerInstancesToAutoJoinHelixCluster(5, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(5, true);
+
+    // Adding table
+    TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME)
+        
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(2).build();
+    _helixResourceManager.addTable(tableConfig);
+  }
+
   @Test
   public void testUploadBogusData()
       throws Exception {
@@ -54,28 +66,9 @@ public class PinotFileUploadTest extends ControllerTest {
     Assert.assertTrue(statusCode >= 400 && statusCode < 500, "Status code = " 
+ statusCode);
   }
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    startZk();
-    startController();
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, 5, true);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, 5, true);
-
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(),
-        5);
-
-    // Adding table
-    TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME)
-        
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(2).build();
-    _helixResourceManager.addTable(tableConfig);
-  }
-
   @AfterClass
-  public void tearDown()
-      throws Exception {
+  public void tearDown() {
+    stopFakeInstances();
     stopController();
     stopZk();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
index 65d3d00..5f1dd94 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotSegmentRestletResourceTest.java
@@ -25,8 +25,6 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.testng.Assert;
@@ -36,22 +34,15 @@ import org.testng.annotations.Test;
 
 
 public class PinotSegmentRestletResourceTest extends ControllerTest {
-  private final static String ZK_SERVER = ZkStarter.DEFAULT_ZK_STR;
-  private final static String TABLE_NAME = "testTable";
+  private static final String TABLE_NAME = "testTable";
 
   @BeforeClass
   public void setUp()
       throws Exception {
     startZk();
     startController();
-
-    
ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
 ZK_SERVER, 1, true);
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZK_SERVER, 1, true);
-
-    while (_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), 
"DefaultTenant_OFFLINE").size() == 0) {
-      Thread.sleep(100);
-    }
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
 
     
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_OFFLINE").size(),
         1);
@@ -59,13 +50,6 @@ public class PinotSegmentRestletResourceTest extends 
ControllerTest {
         1);
   }
 
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    stopController();
-    stopZk();
-  }
-
   @Test
   public void testSegmentCrcApi()
       throws Exception {
@@ -123,4 +107,11 @@ public class PinotSegmentRestletResourceTest extends 
ControllerTest {
     }
     Assert.assertEquals(crcMap.size(), expectedSize);
   }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 71dd4c0..6e4155d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -25,9 +25,7 @@ import org.apache.pinot.common.config.QuotaConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -60,12 +58,8 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     startController(config);
     _createTableUrl = _controllerRequestURLBuilder.forTableCreate();
 
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR,
-            NUM_BROKER_INSTANCES, true);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR,
-            NUM_SERVER_INSTANCES, true);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
 
     
_offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS")
         .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5");
@@ -308,6 +302,7 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
 
   @AfterClass
   public void tearDown() {
+    stopFakeInstances();
     stopController();
     stopZk();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
index b5aa4cc..fa38d8a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -81,12 +79,8 @@ public class PinotTenantRestletResourceTest extends 
ControllerTest {
     // Add a table to the server
     String createTableUrl = _controllerRequestURLBuilder.forTableCreate();
 
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR,
-            NUM_BROKER_INSTANCES, true);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR,
-            NUM_SERVER_INSTANCES, true);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
 
     
_offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS")
         
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setServerTenant("DefaultTenant");
@@ -101,6 +95,8 @@ public class PinotTenantRestletResourceTest extends 
ControllerTest {
         
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("DefaultTenant")));
     assertEquals(tableList.get("tables").size(), 1, "Expected 1 table");
     assertEquals(tableList.get("tables").get(0).asText(), "mytable_OFFLINE");
+
+    stopFakeInstances();
   }
 
   @AfterClass
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
index 0194d42..5ce8ced 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableViewsTest.java
@@ -26,9 +26,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.api.resources.TableViews;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
@@ -52,13 +50,8 @@ public class TableViewsTest extends ControllerTest {
       throws Exception {
     startZk();
     startController();
-
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR,
-            NUM_BROKER_INSTANCES, true);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR,
-            NUM_SERVER_INSTANCES, true);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
 
     // Create the offline table and add one segment
     TableConfig tableConfig =
@@ -178,6 +171,7 @@ public class TableViewsTest extends ControllerTest {
 
   @AfterClass
   public void tearDown() {
+    stopFakeInstances();
     stopController();
     stopZk();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
index 731fa3f..da923c6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerInstanceToggleTest.java
@@ -24,7 +24,6 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.testng.Assert;
@@ -40,17 +39,13 @@ public class ControllerInstanceToggleTest extends 
ControllerTest {
   private static final String BROKER_TAG_NAME = 
TagNameUtils.getBrokerTagForTenant(null);
   private static final int NUM_INSTANCES = 3;
 
-  private final String _helixClusterName = getHelixClusterName();
-
   @BeforeClass
-  public void setup()
+  public void setUp()
       throws Exception {
     startZk();
     startController();
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, true);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, true);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, true);
   }
 
   @Test
@@ -62,41 +57,42 @@ public class ControllerInstanceToggleTest extends 
ControllerTest {
             .setNumReplicas(NUM_INSTANCES).build().toJsonConfigString();
     sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), 
tableJSONConfigString);
     Assert.assertEquals(
-        _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
             .getPartitionSet().size(), 1);
     Assert.assertEquals(
-        _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
             .getInstanceSet(OFFLINE_TABLE_NAME).size(), NUM_INSTANCES);
 
     // Add segments
     for (int i = 0; i < NUM_INSTANCES; i++) {
       _helixResourceManager
           .addNewSegment(RAW_TABLE_NAME, 
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
-      Assert.assertEquals(_helixAdmin.getResourceIdealState(_helixClusterName, 
OFFLINE_TABLE_NAME).getNumPartitions(),
-          i + 1);
+      Assert
+          
.assertEquals(_helixAdmin.getResourceIdealState(getHelixClusterName(), 
OFFLINE_TABLE_NAME).getNumPartitions(),
+              i + 1);
     }
 
     // Disable server instances
     int numEnabledInstances = NUM_INSTANCES;
-    for (String instanceName : 
_helixAdmin.getInstancesInClusterWithTag(_helixClusterName, SERVER_TAG_NAME)) {
+    for (String instanceName : 
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), 
SERVER_TAG_NAME)) {
       
sendPostRequest(_controllerRequestURLBuilder.forInstanceState(instanceName), 
"disable");
       checkNumOnlineInstancesFromExternalView(OFFLINE_TABLE_NAME, 
--numEnabledInstances);
     }
 
     // Enable server instances
-    for (String instanceName : 
_helixAdmin.getInstancesInClusterWithTag(_helixClusterName, SERVER_TAG_NAME)) {
+    for (String instanceName : 
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), 
SERVER_TAG_NAME)) {
       
sendPostRequest(_controllerRequestURLBuilder.forInstanceState(instanceName), 
"ENABLE");
       checkNumOnlineInstancesFromExternalView(OFFLINE_TABLE_NAME, 
++numEnabledInstances);
     }
 
     // Disable broker instances
-    for (String instanceName : 
_helixAdmin.getInstancesInClusterWithTag(_helixClusterName, BROKER_TAG_NAME)) {
+    for (String instanceName : 
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), 
BROKER_TAG_NAME)) {
       
sendPostRequest(_controllerRequestURLBuilder.forInstanceState(instanceName), 
"Disable");
       
checkNumOnlineInstancesFromExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
 --numEnabledInstances);
     }
 
     // Enable broker instances
-    for (String instanceName : 
_helixAdmin.getInstancesInClusterWithTag(_helixClusterName, BROKER_TAG_NAME)) {
+    for (String instanceName : 
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), 
BROKER_TAG_NAME)) {
       
sendPostRequest(_controllerRequestURLBuilder.forInstanceState(instanceName), 
"Enable");
       
checkNumOnlineInstancesFromExternalView(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
 ++numEnabledInstances);
     }
@@ -104,7 +100,7 @@ public class ControllerInstanceToggleTest extends 
ControllerTest {
     // Delete table
     
sendDeleteRequest(_controllerRequestURLBuilder.forTableDelete(RAW_TABLE_NAME));
     Assert.assertEquals(
-        _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
             .getPartitionSet().size(), 0);
   }
 
@@ -112,7 +108,7 @@ public class ControllerInstanceToggleTest extends 
ControllerTest {
       throws InterruptedException {
     long endTime = System.currentTimeMillis() + 10_000L;
     while (System.currentTimeMillis() < endTime) {
-      ExternalView resourceExternalView = 
_helixAdmin.getResourceExternalView(_helixClusterName, resourceName);
+      ExternalView resourceExternalView = 
_helixAdmin.getResourceExternalView(getHelixClusterName(), resourceName);
       Set<String> instanceSet = 
HelixHelper.getOnlineInstanceFromExternalView(resourceExternalView);
       if (instanceSet.size() == expectedNumOnlineInstances) {
         return;
@@ -124,6 +120,7 @@ public class ControllerInstanceToggleTest extends 
ControllerTest {
 
   @AfterClass
   public void tearDown() {
+    stopFakeInstances();
     stopController();
     stopZk();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
index 97bf934..e00eea8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerSentinelTestV2.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -31,23 +30,14 @@ import org.testng.annotations.Test;
 
 
 public class ControllerSentinelTestV2 extends ControllerTest {
-  private final String _helixClusterName = getHelixClusterName();
 
   @BeforeClass
-  public void setup()
+  public void setUp()
       throws Exception {
     startZk();
     startController();
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, 20, true);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, 20, true);
-  }
-
-  @AfterClass
-  public void tearDown() {
-    stopController();
-    stopZk();
+    addFakeBrokerInstancesToAutoJoinHelixCluster(20, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(20, true);
   }
 
   @Test
@@ -60,35 +50,40 @@ public class ControllerSentinelTestV2 extends 
ControllerTest {
             .build().toJsonConfigString();
     sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), 
tableJSONConfigString);
     Assert.assertEquals(
-        _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
             .getPartitionSet().size(), 1);
     Assert.assertEquals(
-        _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
             .getInstanceSet(tableName + "_OFFLINE").size(), 20);
 
     // Adding segments
     for (int i = 0; i < 10; ++i) {
-      Assert
-          .assertEquals(_helixAdmin.getResourceIdealState(_helixClusterName, 
tableName + "_OFFLINE").getNumPartitions(),
-              i);
+      Assert.assertEquals(
+          _helixAdmin.getResourceIdealState(getHelixClusterName(), tableName + 
"_OFFLINE").getNumPartitions(), i);
       _helixResourceManager
           .addNewSegment(tableName, 
SegmentMetadataMockUtils.mockSegmentMetadata(tableName), "downloadUrl");
-      Assert
-          .assertEquals(_helixAdmin.getResourceIdealState(_helixClusterName, 
tableName + "_OFFLINE").getNumPartitions(),
-              i + 1);
+      Assert.assertEquals(
+          _helixAdmin.getResourceIdealState(getHelixClusterName(), tableName + 
"_OFFLINE").getNumPartitions(), i + 1);
     }
 
     // Delete table
     sendDeleteRequest(_controllerRequestURLBuilder.forTableDelete(tableName));
     Assert.assertEquals(
-        _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
             .getPartitionSet().size(), 0);
 
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
         
TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)).size(), 
20);
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
         
TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)).size(), 
20);
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(_helixClusterName,
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
         
TagNameUtils.getOfflineTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME)).size(), 
20);
   }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTenantTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTenantTest.java
index 59923d5..7bd3c56 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTenantTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTenantTest.java
@@ -24,7 +24,6 @@ import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -42,18 +41,14 @@ public class ControllerTenantTest extends ControllerTest {
   private static final int NUM_REALTIME_SERVERS_PER_TAG = 1;
   private static final int NUM_SERVERS_PER_TAG = NUM_OFFLINE_SERVERS_PER_TAG + 
NUM_REALTIME_SERVERS_PER_TAG;
 
-  private final String _helixClusterName = getHelixClusterName();
-
   @BeforeClass
   public void setUp()
       throws Exception {
     startZk();
     startController();
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, false);
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
   }
 
   @Test
@@ -62,15 +57,13 @@ public class ControllerTenantTest extends ControllerTest {
     // Create broker tenants
     for (int i = 1; i <= NUM_BROKER_TAGS; i++) {
       String brokerTenant = BROKER_TAG_PREFIX + i;
-      String payload =
-          
ControllerRequestBuilderUtil.buildBrokerTenantCreateRequestJSON(brokerTenant, 
NUM_BROKERS_PER_TAG);
-      sendPostRequest(_controllerRequestURLBuilder.forTenantCreate(), payload);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getBrokerTagForTenant(brokerTenant))
-              .size(), NUM_BROKERS_PER_TAG);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
-              .size(), NUM_INSTANCES - i * NUM_BROKERS_PER_TAG);
+      createBrokerTenant(brokerTenant, NUM_BROKERS_PER_TAG);
+      Assert.assertEquals(_helixAdmin
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getBrokerTagForTenant(brokerTenant)).size(),
+          NUM_BROKERS_PER_TAG);
+      Assert.assertEquals(_helixAdmin
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE).size(),
+          NUM_INSTANCES - i * NUM_BROKERS_PER_TAG);
     }
 
     // Get broker tenants
@@ -90,26 +83,25 @@ public class ControllerTenantTest extends ControllerTest {
     // Update broker tenants
     for (int i = 0; i <= NUM_INSTANCES - (NUM_BROKER_TAGS - 1) * 
NUM_BROKERS_PER_TAG; i++) {
       String brokerTenant = BROKER_TAG_PREFIX + 1;
-      String payload = 
ControllerRequestBuilderUtil.buildBrokerTenantCreateRequestJSON(brokerTenant, 
i);
-      sendPutRequest(_controllerRequestURLBuilder.forTenantCreate(), payload);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getBrokerTagForTenant(brokerTenant))
-              .size(), i);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
-              .size(), NUM_INSTANCES - (NUM_BROKER_TAGS - 1) * 
NUM_BROKERS_PER_TAG - i);
+      updateBrokerTenant(brokerTenant, i);
+      Assert.assertEquals(_helixAdmin
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getBrokerTagForTenant(brokerTenant)).size(),
+          i);
+      Assert.assertEquals(_helixAdmin
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE).size(),
+          NUM_INSTANCES - (NUM_BROKER_TAGS - 1) * NUM_BROKERS_PER_TAG - i);
     }
 
     // Delete broker tenants
     for (int i = 1; i <= NUM_BROKER_TAGS; i++) {
       String brokerTenant = BROKER_TAG_PREFIX + i;
       
sendDeleteRequest(_controllerRequestURLBuilder.forBrokerTenantDelete(brokerTenant));
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getBrokerTagForTenant(brokerTenant))
-              .size(), 0);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
-              .size(), NUM_INSTANCES - (NUM_BROKER_TAGS - i) * 
NUM_BROKERS_PER_TAG);
+      Assert.assertEquals(_helixAdmin
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getBrokerTagForTenant(brokerTenant)).size(),
+          0);
+      Assert.assertEquals(_helixAdmin
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE).size(),
+          NUM_INSTANCES - (NUM_BROKER_TAGS - i) * NUM_BROKERS_PER_TAG);
     }
   }
 
@@ -119,19 +111,16 @@ public class ControllerTenantTest extends ControllerTest {
     // Create server tenants
     for (int i = 1; i <= NUM_SERVER_TAGS; i++) {
       String serverTenant = SERVER_TAG_PREFIX + i;
-      String payload = ControllerRequestBuilderUtil
-          .buildServerTenantCreateRequestJSON(serverTenant, 
NUM_SERVERS_PER_TAG, NUM_OFFLINE_SERVERS_PER_TAG,
-              NUM_REALTIME_SERVERS_PER_TAG);
-      sendPostRequest(_controllerRequestURLBuilder.forTenantCreate(), payload);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getOfflineTagForTenant(serverTenant))
-              .size(), NUM_OFFLINE_SERVERS_PER_TAG);
+      createServerTenant(serverTenant, NUM_OFFLINE_SERVERS_PER_TAG, 
NUM_REALTIME_SERVERS_PER_TAG);
       Assert.assertEquals(_helixAdmin
-              .getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getRealtimeTagForTenant(serverTenant)).size(),
-          NUM_REALTIME_SERVERS_PER_TAG);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
-              .size(), NUM_INSTANCES - i * NUM_SERVERS_PER_TAG);
+          .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getOfflineTagForTenant(serverTenant))
+          .size(), NUM_OFFLINE_SERVERS_PER_TAG);
+      Assert.assertEquals(_helixAdmin
+          .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getRealtimeTagForTenant(serverTenant))
+          .size(), NUM_REALTIME_SERVERS_PER_TAG);
+      Assert.assertEquals(_helixAdmin
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE).size(),
+          NUM_INSTANCES - i * NUM_SERVERS_PER_TAG);
     }
 
     // Get server tenants
@@ -152,39 +141,37 @@ public class ControllerTenantTest extends ControllerTest {
     // Note: server tenants cannot scale down
     for (int i = 0; i <= (NUM_INSTANCES - NUM_SERVER_TAGS * 
NUM_SERVERS_PER_TAG) / 2; i++) {
       String serverTenant = SERVER_TAG_PREFIX + 1;
-      String payload = ControllerRequestBuilderUtil
-          .buildServerTenantCreateRequestJSON(serverTenant, 
NUM_SERVERS_PER_TAG + i * 2,
-              NUM_OFFLINE_SERVERS_PER_TAG + i, NUM_REALTIME_SERVERS_PER_TAG + 
i);
-      sendPutRequest(_controllerRequestURLBuilder.forTenantCreate(), payload);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getOfflineTagForTenant(serverTenant))
-              .size(), NUM_OFFLINE_SERVERS_PER_TAG + i);
+      updateServerTenant(serverTenant, NUM_OFFLINE_SERVERS_PER_TAG + i, 
NUM_REALTIME_SERVERS_PER_TAG + i);
+      Assert.assertEquals(_helixAdmin
+          .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getOfflineTagForTenant(serverTenant))
+          .size(), NUM_OFFLINE_SERVERS_PER_TAG + i);
+      Assert.assertEquals(_helixAdmin
+          .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getRealtimeTagForTenant(serverTenant))
+          .size(), NUM_REALTIME_SERVERS_PER_TAG + i);
       Assert.assertEquals(_helixAdmin
-              .getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getRealtimeTagForTenant(serverTenant)).size(),
-          NUM_REALTIME_SERVERS_PER_TAG + i);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
-              .size(), NUM_INSTANCES - NUM_SERVER_TAGS * NUM_SERVERS_PER_TAG - 
i * 2);
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE).size(),
+          NUM_INSTANCES - NUM_SERVER_TAGS * NUM_SERVERS_PER_TAG - i * 2);
     }
 
     // Delete server tenants
     for (int i = 1; i < NUM_SERVER_TAGS; i++) {
       String serverTenant = SERVER_TAG_PREFIX + i;
       
sendDeleteRequest(_controllerRequestURLBuilder.forServerTenantDelete(serverTenant));
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getOfflineTagForTenant(serverTenant))
-              .size(), 0);
       Assert.assertEquals(_helixAdmin
-              .getInstancesInClusterWithTag(_helixClusterName, 
TagNameUtils.getRealtimeTagForTenant(serverTenant)).size(),
-          0);
-      Assert.assertEquals(
-          _helixAdmin.getInstancesInClusterWithTag(_helixClusterName, 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
-              .size(), NUM_INSTANCES - (NUM_SERVER_TAGS - i) * 
NUM_SERVERS_PER_TAG);
+          .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getOfflineTagForTenant(serverTenant))
+          .size(), 0);
+      Assert.assertEquals(_helixAdmin
+          .getInstancesInClusterWithTag(getHelixClusterName(), 
TagNameUtils.getRealtimeTagForTenant(serverTenant))
+          .size(), 0);
+      Assert.assertEquals(_helixAdmin
+              .getInstancesInClusterWithTag(getHelixClusterName(), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE).size(),
+          NUM_INSTANCES - (NUM_SERVER_TAGS - i) * NUM_SERVERS_PER_TAG);
     }
   }
 
   @AfterClass
   public void tearDown() {
+    stopFakeInstances();
     stopController();
     stopZk();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 4d49192..ab1ea56 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Preconditions;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
@@ -28,7 +29,10 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.PutMethod;
@@ -39,21 +43,40 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.config.Tenant;
 import org.apache.pinot.common.data.DimensionFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.MetricFieldSpec;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.utils.TenantRole;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerStarter;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.Instance.ADMIN_PORT_KEY;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
+import static 
org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+
 
 /**
  * Base class for controller tests.
@@ -63,6 +86,10 @@ public abstract class ControllerTest {
   protected static final int DEFAULT_CONTROLLER_PORT = 18998;
   protected static final String DEFAULT_DATA_DIR =
       new File(FileUtils.getTempDirectoryPath(), "test-controller-" + 
System.currentTimeMillis()).getAbsolutePath();
+  protected static final String BROKER_INSTANCE_ID_PREFIX = 
"Broker_localhost_";
+  protected static final String SERVER_INSTANCE_ID_PREFIX = 
"Server_localhost_";
+
+  protected final List<HelixManager> _fakeInstanceHelixManagers = new 
ArrayList<>();
 
   protected int _controllerPort;
   protected String _controllerBaseApiUrl;
@@ -120,7 +147,10 @@ public abstract class ControllerTest {
     _controllerRequestURLBuilder = 
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
     _controllerDataDir = config.getDataDir();
 
-    startControllerStarter(config);
+    _controllerStarter = getControllerStarter(config);
+    _controllerStarter.start();
+    _helixResourceManager = _controllerStarter.getHelixResourceManager();
+    _helixManager = _controllerStarter.getHelixControllerManager();
 
     // HelixResourceManager is null in Helix only mode, while HelixManager is 
null in Pinot only mode.
     switch (_controllerStarter.getControllerMode()) {
@@ -143,27 +173,205 @@ public abstract class ControllerTest {
     }
   }
 
-  protected void startControllerStarter(ControllerConf config) {
-    _controllerStarter = getControllerStarter(config);
-    _controllerStarter.start();
-    _helixResourceManager = _controllerStarter.getHelixResourceManager();
-    _helixManager = _controllerStarter.getHelixControllerManager();
-  }
-
   protected ControllerStarter getControllerStarter(ControllerConf config) {
     return new ControllerStarter(config);
   }
 
   protected void stopController() {
-    stopControllerStarter();
+    _controllerStarter.stop();
+    _controllerStarter = null;
     FileUtils.deleteQuietly(new File(_controllerDataDir));
   }
 
-  protected void stopControllerStarter() {
-    Assert.assertNotNull(_controllerStarter);
+  protected void addFakeBrokerInstancesToAutoJoinHelixCluster(int 
numInstances, boolean isSingleTenant)
+      throws Exception {
+    for (int i = 0; i < numInstances; i++) {
+      addFakeBrokerInstanceToAutoJoinHelixCluster(BROKER_INSTANCE_ID_PREFIX + 
i, isSingleTenant);
+    }
+  }
 
-    _controllerStarter.stop();
-    _controllerStarter = null;
+  protected void addFakeBrokerInstanceToAutoJoinHelixCluster(String 
instanceId, boolean isSingleTenant)
+      throws Exception {
+    HelixManager helixManager = HelixManagerFactory
+        .getZKHelixManager(getHelixClusterName(), instanceId, 
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
+    helixManager.getStateMachineEngine()
+        
.registerStateModelFactory(FakeBrokerResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
+            FakeBrokerResourceOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
+    helixManager.connect();
+    HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+    if (isSingleTenant) {
+      helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, 
TagNameUtils.getBrokerTagForTenant(null));
+    } else {
+      helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, 
UNTAGGED_BROKER_INSTANCE);
+    }
+    _fakeInstanceHelixManagers.add(helixManager);
+  }
+
+  public static class FakeBrokerResourceOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
+    private static final String STATE_MODEL_DEF = 
"BrokerResourceOnlineOfflineStateModel";
+    private static final FakeBrokerResourceOnlineOfflineStateModelFactory 
FACTORY_INSTANCE =
+        new FakeBrokerResourceOnlineOfflineStateModelFactory();
+    private static final FakeBrokerResourceOnlineOfflineStateModel 
STATE_MODEL_INSTANCE =
+        new FakeBrokerResourceOnlineOfflineStateModel();
+
+    private FakeBrokerResourceOnlineOfflineStateModelFactory() {
+    }
+
+    @Override
+    public StateModel createNewStateModel(String resourceName, String 
partitionName) {
+      return STATE_MODEL_INSTANCE;
+    }
+
+    @SuppressWarnings("unused")
+    @StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'DROPPED'}", initialState 
= "OFFLINE")
+    public static class FakeBrokerResourceOnlineOfflineStateModel extends 
StateModel {
+      private static final Logger LOGGER = 
LoggerFactory.getLogger(FakeBrokerResourceOnlineOfflineStateModel.class);
+
+      private FakeBrokerResourceOnlineOfflineStateModel() {
+      }
+
+      @Transition(from = "OFFLINE", to = "ONLINE")
+      public void onBecomeOnlineFromOffline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
+      }
+
+      @Transition(from = "OFFLINE", to = "DROPPED")
+      public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
+      }
+
+      @Transition(from = "ONLINE", to = "OFFLINE")
+      public void onBecomeOfflineFromOnline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
+      }
+
+      @Transition(from = "ONLINE", to = "DROPPED")
+      public void onBecomeDroppedFromOnline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
+      }
+
+      @Transition(from = "ERROR", to = "OFFLINE")
+      public void onBecomeOfflineFromError(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeOfflineFromError(): {}", message);
+      }
+    }
+  }
+
+  protected void addFakeServerInstancesToAutoJoinHelixCluster(int 
numInstances, boolean isSingleTenant)
+      throws Exception {
+    addFakeServerInstancesToAutoJoinHelixCluster(numInstances, isSingleTenant, 
DEFAULT_ADMIN_API_PORT);
+  }
+
+  protected void addFakeServerInstancesToAutoJoinHelixCluster(int 
numInstances, boolean isSingleTenant,
+      int baseAdminPort)
+      throws Exception {
+    for (int i = 0; i < numInstances; i++) {
+      addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, isSingleTenant, baseAdminPort + i);
+    }
+  }
+
+  protected void addFakeServerInstanceToAutoJoinHelixCluster(String 
instanceId, boolean isSingleTenant)
+      throws Exception {
+    addFakeServerInstanceToAutoJoinHelixCluster(instanceId, isSingleTenant, 
DEFAULT_ADMIN_API_PORT);
+  }
+
+  protected void addFakeServerInstanceToAutoJoinHelixCluster(String 
instanceId, boolean isSingleTenant, int adminPort)
+      throws Exception {
+    HelixManager helixManager = HelixManagerFactory
+        .getZKHelixManager(getHelixClusterName(), instanceId, 
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
+    helixManager.getStateMachineEngine()
+        
.registerStateModelFactory(FakeSegmentOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
+            FakeSegmentOnlineOfflineStateModelFactory.FACTORY_INSTANCE);
+    helixManager.connect();
+    HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
+    if (isSingleTenant) {
+      helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, 
TagNameUtils.getOfflineTagForTenant(null));
+      helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, 
TagNameUtils.getRealtimeTagForTenant(null));
+    } else {
+      helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, 
UNTAGGED_SERVER_INSTANCE);
+    }
+    HelixConfigScope configScope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, 
getHelixClusterName())
+            .forParticipant(instanceId).build();
+    helixAdmin.setConfig(configScope, Collections.singletonMap(ADMIN_PORT_KEY, 
Integer.toString(adminPort)));
+    _fakeInstanceHelixManagers.add(helixManager);
+  }
+
+  public static class FakeSegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
+    private static final String STATE_MODEL_DEF = 
"SegmentOnlineOfflineStateModel";
+    private static final FakeSegmentOnlineOfflineStateModelFactory 
FACTORY_INSTANCE =
+        new FakeSegmentOnlineOfflineStateModelFactory();
+    private static final FakeSegmentOnlineOfflineStateModel 
STATE_MODEL_INSTANCE =
+        new FakeSegmentOnlineOfflineStateModel();
+
+    private FakeSegmentOnlineOfflineStateModelFactory() {
+    }
+
+    @Override
+    public StateModel createNewStateModel(String resourceName, String 
partitionName) {
+      return STATE_MODEL_INSTANCE;
+    }
+
+    @SuppressWarnings("unused")
+    @StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'CONSUMING', 'DROPPED'}", 
initialState = "OFFLINE")
+    public static class FakeSegmentOnlineOfflineStateModel extends StateModel {
+      private static final Logger LOGGER = 
LoggerFactory.getLogger(FakeSegmentOnlineOfflineStateModel.class);
+
+      private FakeSegmentOnlineOfflineStateModel() {
+      }
+
+      @Transition(from = "OFFLINE", to = "ONLINE")
+      public void onBecomeOnlineFromOffline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
+      }
+
+      @Transition(from = "OFFLINE", to = "CONSUMING")
+      public void onBecomeConsumingFromOffline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeConsumingFromOffline(): {}", message);
+      }
+
+      @Transition(from = "OFFLINE", to = "DROPPED")
+      public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
+      }
+
+      @Transition(from = "ONLINE", to = "OFFLINE")
+      public void onBecomeOfflineFromOnline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
+      }
+
+      @Transition(from = "ONLINE", to = "DROPPED")
+      public void onBecomeDroppedFromOnline(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
+      }
+
+      @Transition(from = "CONSUMING", to = "OFFLINE")
+      public void onBecomeOfflineFromConsuming(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeOfflineFromConsuming(): {}", message);
+      }
+
+      @Transition(from = "CONSUMING", to = "ONLINE")
+      public void onBecomeOnlineFromConsuming(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeOnlineFromConsuming(): {}", message);
+      }
+
+      @Transition(from = "CONSUMING", to = "DROPPED")
+      public void onBecomeDroppedFromConsuming(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeDroppedFromConsuming(): {}", message);
+      }
+
+      @Transition(from = "ERROR", to = "OFFLINE")
+      public void onBecomeOfflineFromError(Message message, 
NotificationContext context) {
+        LOGGER.debug("onBecomeOfflineFromError(): {}", message);
+      }
+    }
+  }
+
+  protected void stopFakeInstances() {
+    for (HelixManager helixManager : _fakeInstanceHelixManagers) {
+      helixManager.disconnect();
+    }
+    _fakeInstanceHelixManagers.clear();
   }
 
   protected Schema createDummySchema(String tableName) {
@@ -193,6 +401,45 @@ public abstract class ControllerTest {
     Assert.assertEquals(postMethod.getStatusCode(), 200);
   }
 
+  protected String getBrokerTenantRequestPayload(String tenantName, int 
numBrokers)
+      throws JsonProcessingException {
+    Tenant tenant =
+        new 
Tenant.TenantBuilder(tenantName).setRole(TenantRole.BROKER).setTotalInstances(numBrokers).build();
+    return JsonUtils.objectToString(tenant);
+  }
+
+  protected void createBrokerTenant(String tenantName, int numBrokers)
+      throws IOException {
+    sendPostRequest(_controllerRequestURLBuilder.forTenantCreate(),
+        getBrokerTenantRequestPayload(tenantName, numBrokers));
+  }
+
+  protected void updateBrokerTenant(String tenantName, int numBrokers)
+      throws IOException {
+    sendPutRequest(_controllerRequestURLBuilder.forTenantCreate(),
+        getBrokerTenantRequestPayload(tenantName, numBrokers));
+  }
+
+  protected String getServerTenantRequestPayload(String tenantName, int 
numOfflineServers, int numRealtimeServers)
+      throws JsonProcessingException {
+    Tenant tenant = new 
Tenant.TenantBuilder(tenantName).setRole(TenantRole.SERVER)
+        .setTotalInstances(numOfflineServers + 
numRealtimeServers).setOfflineInstances(numOfflineServers)
+        .setRealtimeInstances(numRealtimeServers).build();
+    return JsonUtils.objectToString(tenant);
+  }
+
+  protected void createServerTenant(String tenantName, int numOfflineServers, 
int numRealtimeServers)
+      throws IOException {
+    sendPostRequest(_controllerRequestURLBuilder.forTenantCreate(),
+        getServerTenantRequestPayload(tenantName, numOfflineServers, 
numRealtimeServers));
+  }
+
+  protected void updateServerTenant(String tenantName, int numOfflineServers, 
int numRealtimeServers)
+      throws IOException {
+    sendPutRequest(_controllerRequestURLBuilder.forTenantCreate(),
+        getServerTenantRequestPayload(tenantName, numOfflineServers, 
numRealtimeServers));
+  }
+
   public static String sendGetRequest(String urlString)
       throws IOException {
     return constructResponse(new URL(urlString).openStream());
@@ -211,7 +458,7 @@ public abstract class ControllerTest {
     if (payload != null && !payload.isEmpty()) {
       httpConnection.setDoOutput(true);
       try (BufferedWriter writer = new BufferedWriter(
-          new OutputStreamWriter(httpConnection.getOutputStream(), "UTF-8"))) {
+          new OutputStreamWriter(httpConnection.getOutputStream(), 
StandardCharsets.UTF_8))) {
         writer.write(payload, 0, payload.length());
         writer.flush();
       }
@@ -226,8 +473,8 @@ public abstract class ControllerTest {
     httpConnection.setDoOutput(true);
     httpConnection.setRequestMethod("PUT");
 
-    try (
-        BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(httpConnection.getOutputStream(), "UTF-8"))) {
+    try (BufferedWriter writer = new BufferedWriter(
+        new OutputStreamWriter(httpConnection.getOutputStream(), 
StandardCharsets.UTF_8))) {
       writer.write(payload);
       writer.flush();
     }
@@ -246,7 +493,7 @@ public abstract class ControllerTest {
 
   private static String constructResponse(InputStream inputStream)
       throws IOException {
-    try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream, "UTF-8"))) {
+    try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
       StringBuilder responseBuilder = new StringBuilder();
       String line;
       while ((line = reader.readLine()) != null) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
index 191241d..a297b27 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
@@ -35,7 +35,6 @@ import org.testng.annotations.Test;
 public class HelixHelperTest extends ControllerTest {
   public static final String RESOURCE_NAME = "potato_OFFLINE";
   public static final String INSTANCE_NAME = "Server_1.2.3.4_1234";
-  private String helixClusterName;
 
   @BeforeClass
   public void setUp() {
@@ -46,8 +45,7 @@ public class HelixHelperTest extends ControllerTest {
     idealState.setStateModelDefRef("OnlineOffline");
     idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
     idealState.setReplicas("0");
-    helixClusterName = getHelixClusterName();
-    _helixAdmin.addResource(helixClusterName, RESOURCE_NAME, idealState);
+    _helixAdmin.addResource(getHelixClusterName(), RESOURCE_NAME, idealState);
   }
 
   /**
@@ -68,7 +66,7 @@ public class HelixHelperTest extends ControllerTest {
       }
     }, RetryPolicies.noDelayRetryPolicy(1));
 
-    IdealState resourceIdealState = 
_helixAdmin.getResourceIdealState(helixClusterName, RESOURCE_NAME);
+    IdealState resourceIdealState = 
_helixAdmin.getResourceIdealState(getHelixClusterName(), RESOURCE_NAME);
     for (int i = 0; i < numSegments; i++) {
       Assert.assertEquals(resourceIdealState.getInstanceStateMap("segment_" + 
i).get(INSTANCE_NAME), "ONLINE");
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index d65e818..06b5de8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -22,13 +22,11 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -37,28 +35,23 @@ import org.testng.annotations.Test;
 
 
 public class PinotResourceManagerTest extends ControllerTest {
-  private final static String TABLE_NAME = "testTable";
-
-  private ZkStarter.ZookeeperInstance _zookeeperInstance;
-  private ZkClient _zkClient;
+  private static final String TABLE_NAME = "testTable";
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    _zookeeperInstance = ZkStarter.startLocalZkServer();
-    _zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
-
+    startZk();
     startController();
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
 
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, 1, true);
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, 1, true);
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(), 1);
-    Assert
-        
.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), 
"DefaultTenant_OFFLINE").size(), 1);
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(),
+        1);
+    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_OFFLINE").size(),
+        1);
     Assert
-        
.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), 
"DefaultTenant_REALTIME").size(), 1);
+        
.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), 
"DefaultTenant_REALTIME").size(),
+            1);
 
     // Adding table
     TableConfig tableConfig =
@@ -78,11 +71,13 @@ public class PinotResourceManagerTest extends 
ControllerTest {
     
Assert.assertTrue(_helixResourceManager.updateZkMetadata("testTable_OFFLINE", 
segmentZKMetadata));
 
     // Update ZK metadata
-    Assert.assertEquals(
-        _helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", 
"testSegment").getVersion(), 0);
+    Assert
+        
.assertEquals(_helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE",
 "testSegment").getVersion(),
+            0);
     
Assert.assertTrue(_helixResourceManager.updateZkMetadata("testTable_OFFLINE", 
segmentZKMetadata, 0));
-    Assert.assertEquals(
-        _helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", 
"testSegment").getVersion(), 1);
+    Assert
+        
.assertEquals(_helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE",
 "testSegment").getVersion(),
+            1);
     
Assert.assertFalse(_helixResourceManager.updateZkMetadata("testTable_OFFLINE", 
segmentZKMetadata, 0));
   }
 
@@ -151,8 +146,8 @@ public class PinotResourceManagerTest extends 
ControllerTest {
 
   @AfterClass
   public void tearDown() {
-    _helixResourceManager.stop();
-    _zkClient.close();
-    ZkStarter.stopLocalZkServer(_zookeeperInstance);
+    stopFakeInstances();
+    stopController();
+    stopZk();
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index fc07811..d130e31 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -51,9 +51,7 @@ import 
org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.TenantRole;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -82,8 +80,6 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
   private static final long TIMEOUT_IN_MS = 10_000L;
 
-  private final String _helixClusterName = getHelixClusterName();
-
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -91,13 +87,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     ControllerConf config = getDefaultControllerConfiguration();
     config.setTenantIsolationEnabled(false);
     startController(config);
-
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES,
-            false);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false,
-            BASE_SERVER_ADMIN_PORT);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false, 
BASE_SERVER_ADMIN_PORT);
 
     // Create server tenant on all Servers
     Tenant serverTenant =
@@ -114,7 +105,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     Set<String> servers = 
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     BiMap<String, String> endpoints = 
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
     for (int i = 0; i < NUM_INSTANCES; i++) {
-      Assert.assertTrue(endpoints.inverse().containsKey("localhost:" + 
String.valueOf(BASE_SERVER_ADMIN_PORT + i)));
+      Assert.assertTrue(endpoints.inverse().containsKey("localhost:" + 
(BASE_SERVER_ADMIN_PORT + i)));
     }
   }
 
@@ -124,7 +115,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     Set<String> servers = 
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     for (String server : servers) {
       InstanceConfig cachedInstanceConfig = 
_helixResourceManager.getHelixInstanceConfig(server);
-      InstanceConfig realInstanceConfig = 
_helixAdmin.getInstanceConfig(_helixClusterName, server);
+      InstanceConfig realInstanceConfig = 
_helixAdmin.getInstanceConfig(getHelixClusterName(), server);
       Assert.assertEquals(cachedInstanceConfig, realInstanceConfig);
     }
 
@@ -140,7 +131,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   private void modifyExistingInstanceConfig(ZkClient zkClient)
       throws InterruptedException {
     String instanceName = "Server_localhost_" + new 
Random().nextInt(NUM_INSTANCES);
-    String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+    String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(getHelixClusterName(), instanceName);
     Assert.assertTrue(zkClient.exists(instanceConfigPath));
     ZNRecord znRecord = zkClient.readData(instanceConfigPath, null);
 
@@ -172,8 +163,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   private void addAndRemoveNewInstanceConfig(ZkClient zkClient)
       throws Exception {
     int biggerRandomNumber = NUM_INSTANCES + new 
Random().nextInt(NUM_INSTANCES);
-    String instanceName = "Server_localhost_" + 
String.valueOf(biggerRandomNumber);
-    String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+    String instanceName = "Server_localhost_" + biggerRandomNumber;
+    String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(getHelixClusterName(), instanceName);
     Assert.assertFalse(zkClient.exists(instanceConfigPath));
     List<String> instances = _helixResourceManager.getAllInstances();
     Assert.assertFalse(instances.contains(instanceName));
@@ -217,19 +208,20 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
 
     // Check that the BrokerResource ideal state has 3 Brokers assigned to the 
table
     IdealState idealState = _helixResourceManager.getHelixAdmin()
-        .getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+        .getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     
Assert.assertEquals(idealState.getInstanceStateMap(OFFLINE_TABLE_NAME).size(), 
3);
 
     // Untag all Brokers assigned to broker tenant
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
-      _helixAdmin
-          .removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
-      _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+      _helixAdmin.removeInstanceTag(getHelixClusterName(), brokerInstance,
+          TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+      _helixAdmin.addInstanceTag(getHelixClusterName(), brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
 
     // Rebuilding the broker tenant should update the ideal state size
     
_helixResourceManager.rebuildBrokerResourceFromHelixTags(OFFLINE_TABLE_NAME);
-    idealState = _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    idealState =
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     
Assert.assertEquals(idealState.getInstanceStateMap(OFFLINE_TABLE_NAME).size(), 
0);
 
     // Create broker tenant on 5 Brokers
@@ -238,7 +230,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
 
     // Rebuilding the broker tenant should update the ideal state size
     
_helixResourceManager.rebuildBrokerResourceFromHelixTags(OFFLINE_TABLE_NAME);
-    idealState = _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    idealState =
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     
Assert.assertEquals(idealState.getInstanceStateMap(OFFLINE_TABLE_NAME).size(), 
5);
 
     // Delete the table
@@ -289,13 +282,13 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
 
     String testBrokerInstance =
         
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME).iterator().next();
-    _helixAdmin.addInstanceTag(_helixClusterName, testBrokerInstance, 
"wrong_tag");
+    _helixAdmin.addInstanceTag(getHelixClusterName(), testBrokerInstance, 
"wrong_tag");
 
     brokerTenantNames = _helixResourceManager.getAllBrokerTenantNames();
     Assert.assertEquals(brokerTenantNames.size(), 1);
     Assert.assertEquals(brokerTenantNames.iterator().next(), 
BROKER_TENANT_NAME);
 
-    _helixAdmin.removeInstanceTag(_helixClusterName, testBrokerInstance, 
"wrong_tag");
+    _helixAdmin.removeInstanceTag(getHelixClusterName(), testBrokerInstance, 
"wrong_tag");
 
     // Server tenant is already created during setup.
     Set<String> serverTenantNames = 
_helixResourceManager.getAllServerTenantNames();
@@ -304,13 +297,13 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
 
     String testServerInstance =
         
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME).iterator().next();
-    _helixAdmin.addInstanceTag(_helixClusterName, testServerInstance, 
"wrong_tag");
+    _helixAdmin.addInstanceTag(getHelixClusterName(), testServerInstance, 
"wrong_tag");
 
     serverTenantNames = _helixResourceManager.getAllServerTenantNames();
     Assert.assertEquals(serverTenantNames.size(), 1);
     Assert.assertEquals(serverTenantNames.iterator().next(), 
SERVER_TENANT_NAME);
 
-    _helixAdmin.removeInstanceTag(_helixClusterName, testServerInstance, 
"wrong_tag");
+    _helixAdmin.removeInstanceTag(getHelixClusterName(), testServerInstance, 
"wrong_tag");
   }
 
   @Test
@@ -465,8 +458,9 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     }
 
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(brokerTag)) {
-      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(brokerTag));
-      _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+      _helixAdmin
+          .removeInstanceTag(getHelixClusterName(), brokerInstance, 
TagNameUtils.getBrokerTagForTenant(brokerTag));
+      _helixAdmin.addInstanceTag(getHelixClusterName(), brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
   }
 
@@ -572,14 +566,15 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   public void cleanUpBrokerTags() {
     // Untag all Brokers for other tests
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
-      _helixAdmin
-          .removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
-      _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+      _helixAdmin.removeInstanceTag(getHelixClusterName(), brokerInstance,
+          TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+      _helixAdmin.addInstanceTag(getHelixClusterName(), brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
   }
 
   @AfterClass
   public void tearDown() {
+    stopFakeInstances();
     stopController();
     stopZk();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
index 84c8b50..2350b87 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceStrategyTest.java
@@ -31,9 +31,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment;
 import 
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.ReplicaGroupTestUtils;
 import org.testng.Assert;
@@ -49,9 +47,9 @@ public class ReplicaGroupRebalanceStrategyTest extends 
ControllerTest {
   private static final int INITIAL_NUM_SEGMENTS = 20;
 
   private static final String TABLE_NAME = "testReplicaRebalanceReplace";
-  private final static String PARTITION_COLUMN = "memberId";
-  private final static String OFFLINE_TENENT_NAME = "DefaultTenant_OFFLINE";
-  private final static String NEW_SEGMENT_PREFIX = "new_segment_";
+  private static final String PARTITION_COLUMN = "memberId";
+  private static final String OFFLINE_TENENT_NAME = "DefaultTenant_OFFLINE";
+  private static final String NEW_SEGMENT_PREFIX = "new_segment_";
 
   private final TableConfig.Builder _offlineBuilder = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE);
 
@@ -63,12 +61,8 @@ public class ReplicaGroupRebalanceStrategyTest extends 
ControllerTest {
       ControllerConf config = getDefaultControllerConfiguration();
       config.setTableMinReplicas(MIN_NUM_REPLICAS);
       startController(config);
-      ControllerRequestBuilderUtil
-          .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR,
-              NUM_BROKER_INSTANCES, true);
-      ControllerRequestBuilderUtil
-          .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR,
-              NUM_SERVER_INSTANCES, true);
+      addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+      addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
 
       
_offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS")
           .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5");
@@ -78,9 +72,7 @@ public class ReplicaGroupRebalanceStrategyTest extends 
ControllerTest {
       // Join 4 more servers as untagged
       String[] instanceNames = {"Server_localhost_a", "Server_localhost_b", 
"Server_localhost_c", "Server_localhost_d"};
       for (String instanceName : instanceNames) {
-        ControllerRequestBuilderUtil
-            .addFakeDataInstanceToAutoJoinHelixCluster(getHelixClusterName(), 
ZkStarter.DEFAULT_ZK_STR, instanceName,
-                true);
+        addFakeServerInstanceToAutoJoinHelixCluster(instanceName, true);
         _helixAdmin.removeInstanceTag(getHelixClusterName(), instanceName, 
OFFLINE_TENENT_NAME);
       }
     } catch (Exception e) {
@@ -88,12 +80,6 @@ public class ReplicaGroupRebalanceStrategyTest extends 
ControllerTest {
     }
   }
 
-  @AfterClass
-  public void tearDown() {
-    stopController();
-    stopZk();
-  }
-
   @Test
   public void testReplicaGroupRebalanceStrategy()
       throws Exception {
@@ -324,4 +310,11 @@ public class ReplicaGroupRebalanceStrategyTest extends 
ControllerTest {
     _helixResourceManager
         .setExistingTableConfig(tableConfig, tableNameWithType, 
CommonConstants.Helix.TableType.OFFLINE);
   }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
index 836824c..5376911 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
@@ -22,8 +22,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.helix.HelixManager;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.config.IndexingConfig;
@@ -35,71 +33,39 @@ import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment;
 import 
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.ReplicaGroupTestUtils;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.testng.Assert;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 public class SegmentAssignmentStrategyTest extends ControllerTest {
-  private final static String ZK_SERVER = ZkStarter.DEFAULT_ZK_STR;
-  private final static String TABLE_NAME_BALANCED = "testResourceBalanced";
-  private final static String TABLE_NAME_RANDOM = "testResourceRandom";
-  private final static String TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT = 
"testReplicaGroupPartitionAssignment";
-  private final static String TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP = 
"testTableLevelReplicaGroup";
-  private final static String TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP = 
"testPartitionLevelReplicaGroup";
-
-  private static final Random random = new Random();
-  private final static String PARTITION_COLUMN = "memberId";
-  private final static int NUM_REPLICA = 2;
-  private ZkClient _zkClient;
-  private final int _numServerInstance = 10;
-  private final int _numBrokerInstance = 1;
-  private ZkStarter.ZookeeperInstance _zookeeperInstance;
+  private static final String TABLE_NAME_BALANCED = "testResourceBalanced";
+  private static final String TABLE_NAME_RANDOM = "testResourceRandom";
+  private static final String TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT = 
"testReplicaGroupPartitionAssignment";
+  private static final String TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP = 
"testTableLevelReplicaGroup";
+  private static final String TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP = 
"testPartitionLevelReplicaGroup";
+  private static final String PARTITION_COLUMN = "memberId";
+  private static final int NUM_REPLICAS = 2;
+  private static final int NUM_SERVERS = 10;
+  private static final int NUM_BROKERS = 1;
+  private static final Random RANDOM = new Random();
+
   private ReplicaGroupPartitionAssignmentGenerator 
_partitionAssignmentGenerator;
 
-  @BeforeTest
-  public void setup()
+  @BeforeClass
+  public void setUp()
       throws Exception {
-    _zookeeperInstance = ZkStarter.startLocalZkServer();
-    _zkClient = new ZkClient(ZK_SERVER);
-    final String zkPath = "/" + getHelixClusterName();
-    if (_zkClient.exists(zkPath)) {
-      _zkClient.deleteRecursive(zkPath);
-    }
+    startZk();
     startController();
-    HelixManager helixZkManager = _helixResourceManager.getHelixZkManager();
-
-    _partitionAssignmentGenerator =
-        new 
ReplicaGroupPartitionAssignmentGenerator(helixZkManager.getHelixPropertyStore());
-
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZK_SERVER, _numServerInstance, true);
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), 
ZK_SERVER, _numBrokerInstance, true);
-    Thread.sleep(100);
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_OFFLINE").size(),
-        _numServerInstance);
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_REALTIME").size(),
-        _numServerInstance);
-
-    
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
 "DefaultTenant_BROKER").size(),
-        _numBrokerInstance);
-  }
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKERS, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true);
 
-  @AfterTest
-  public void tearDown() {
-    _helixResourceManager.stop();
-    _zkClient.close();
-    ZkStarter.stopLocalZkServer(_zookeeperInstance);
+    _partitionAssignmentGenerator = new 
ReplicaGroupPartitionAssignmentGenerator(_propertyStore);
   }
 
   @Test
@@ -108,7 +74,7 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
     // Adding table
     TableConfig tableConfig =
         new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME_RANDOM)
-            
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICA).build();
+            
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICAS).build();
     _helixResourceManager.addTable(tableConfig);
 
     // Wait for the table addition
@@ -125,8 +91,7 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
       while (!allSegmentsPushedToIdealState(TABLE_NAME_RANDOM, i + 1)) {
         Thread.sleep(100);
       }
-      final Set<String> taggedInstances =
-          
_helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
+      final Set<String> taggedInstances = 
_helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
       final Map<String, Integer> instanceToNumSegmentsMap = new HashMap<>();
       for (final String instance : taggedInstances) {
         instanceToNumSegmentsMap.put(instance, 0);
@@ -135,7 +100,7 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
           .getResourceIdealState(getHelixClusterName(), 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_RANDOM));
       Assert.assertEquals(idealState.getPartitionSet().size(), i + 1);
       for (final String segmentId : idealState.getPartitionSet()) {
-        Assert.assertEquals(idealState.getInstanceStateMap(segmentId).size(), 
NUM_REPLICA);
+        Assert.assertEquals(idealState.getInstanceStateMap(segmentId).size(), 
NUM_REPLICAS);
       }
     }
   }
@@ -163,8 +128,7 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
       Thread.sleep(100);
     }
 
-    final Set<String> taggedInstances =
-        
_helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
+    final Set<String> taggedInstances = 
_helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
     final Map<String, Integer> instance2NumSegmentsMap = new HashMap<>();
     for (final String instance : taggedInstances) {
       instance2NumSegmentsMap.put(instance, 0);
@@ -177,9 +141,9 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
       }
     }
     final int totalSegments = (numSegments) * numReplicas;
-    final int minNumSegmentsPerInstance = totalSegments / _numServerInstance;
+    final int minNumSegmentsPerInstance = totalSegments / NUM_SERVERS;
     int maxNumSegmentsPerInstance = minNumSegmentsPerInstance;
-    if ((minNumSegmentsPerInstance * _numServerInstance) < totalSegments) {
+    if ((minNumSegmentsPerInstance * NUM_SERVERS) < totalSegments) {
       maxNumSegmentsPerInstance = maxNumSegmentsPerInstance + 1;
     }
     for (final String instance : instance2NumSegmentsMap.keySet()) {
@@ -200,7 +164,7 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
     // Adding a table without replica group
     TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
         .setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
-        
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICA).build();
+        
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICAS).build();
     _helixResourceManager.addTable(tableConfig);
 
     // Check that partition assignment does not exist
@@ -216,7 +180,7 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
     replicaGroupStrategyConfig.setMirrorAssignmentAcrossReplicaGroups(true);
 
     TableConfig replicaGroupTableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
-        
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT).setNumReplicas(NUM_REPLICA)
+        
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT).setNumReplicas(NUM_REPLICAS)
         
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").build();
 
     
replicaGroupTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
@@ -257,13 +221,10 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
     Map<String, String> streamConfigMap = 
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
 
     // Adding a table without replica group
-    TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(
-        TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
-        .setSegmentAssignmentStrategy("RandomAssignmentStrategy")
-        .setNumReplicas(NUM_REPLICA)
-        .setStreamConfigs(streamConfigMap)
-        .setLLC(true)
-        .build();
+    TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME)
+        .setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
+        
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICAS)
+        .setStreamConfigs(streamConfigMap).setLLC(true).build();
     try {
       _helixResourceManager.addTable(tableConfig);
     } catch (Exception e) {
@@ -282,20 +243,17 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
     
replicaGroupStrategyConfig.setNumInstancesPerPartition(numInstancesPerPartition);
     replicaGroupStrategyConfig.setMirrorAssignmentAcrossReplicaGroups(true);
 
-    TableConfig replicaGroupTableConfig =
-        new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(
-            TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
-            .setNumReplicas(NUM_REPLICA)
-            
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy")
-            .setStreamConfigs(streamConfigMap)
-            .setLLC(true)
-            .build();
+    TableConfig replicaGroupTableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME)
+        
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT).setNumReplicas(NUM_REPLICAS)
+        
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").setStreamConfigs(streamConfigMap)
+        .setLLC(true).build();
 
     
replicaGroupTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
 
     // Check that the replica group partition assignment is created
     try {
-      _helixResourceManager.setExistingTableConfig(replicaGroupTableConfig, 
tableNameWithType, CommonConstants.Helix.TableType.REALTIME);
+      _helixResourceManager
+          .setExistingTableConfig(replicaGroupTableConfig, tableNameWithType, 
CommonConstants.Helix.TableType.REALTIME);
     } catch (Exception e) {
       // ignore
     }
@@ -333,7 +291,7 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
 
     // Create table config
     TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
-        
.setTableName(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP).setNumReplicas(NUM_REPLICA)
+        
.setTableName(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP).setNumReplicas(NUM_REPLICAS)
         
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").build();
 
     
tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
@@ -373,9 +331,9 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
   @Test
   public void testPartitionLevelReplicaGroupSegmentAssignmentStrategy()
       throws Exception {
-    int totalPartitionNumber = random.nextInt(8) + 2;
-    int numInstancesPerPartition = random.nextInt(5) + 1;
-    int numSegments = random.nextInt(10) + 10;
+    int totalPartitionNumber = RANDOM.nextInt(8) + 2;
+    int numInstancesPerPartition = RANDOM.nextInt(5) + 1;
+    int numSegments = RANDOM.nextInt(10) + 10;
 
     // Create the configuration for segment assignment strategy.
     ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new 
ReplicaGroupStrategyConfig();
@@ -392,7 +350,7 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
 
     // Create table config
     TableConfig tableConfig = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
-        
.setTableName(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP).setNumReplicas(NUM_REPLICA)
+        
.setTableName(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP).setNumReplicas(NUM_REPLICAS)
         
.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy").build();
 
     
tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
@@ -436,4 +394,11 @@ public class SegmentAssignmentStrategyTest extends 
ControllerTest {
     return idealState != null && idealState.getPartitionSet() != null
         && idealState.getPartitionSet().size() == segmentNum;
   }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index 55c13a5..3f7097e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -21,11 +21,11 @@ package org.apache.pinot.controller.validation;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -33,11 +33,10 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
+import org.apache.pinot.util.TestUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
@@ -54,33 +53,23 @@ import static org.testng.Assert.assertEquals;
  * Tests for the ValidationManagers.
  */
 public class ValidationManagerTest extends ControllerTest {
-  private static final String ZK_STR = ZkStarter.DEFAULT_ZK_STR;
   private static final String TEST_TABLE_NAME = "testTable";
   private static final String TEST_TABLE_TWO = "testTable2";
   private static final String TEST_SEGMENT_NAME = "testSegment";
 
-  private ZkClient _zkClient;
-
-  private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private TableConfig _offlineTableConfig;
-  private HelixManager _helixManager;
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    _zookeeperInstance = ZkStarter.startLocalZkServer();
-    _zkClient = new ZkClient(ZK_STR);
-    Thread.sleep(1000);
-
+    startZk();
     startController();
-
-    
ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(),
 ZK_STR, 2, true);
-    
ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(),
 ZK_STR, 2, true);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(2, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(2, true);
 
     _offlineTableConfig =
         new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
             .build();
-    _helixManager = _helixResourceManager.getHelixZkManager();
     _helixResourceManager.addTable(_offlineTableConfig);
   }
 
@@ -124,8 +113,7 @@ public class ValidationManagerTest extends ControllerTest {
   }
 
   @Test
-  public void testPushTimePersistence()
-      throws Exception {
+  public void testPushTimePersistence() {
     SegmentMetadata segmentMetadata = 
SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME, 
TEST_SEGMENT_NAME);
 
     _helixResourceManager.addNewSegment(TEST_TABLE_NAME, segmentMetadata, 
"http://dummy/";);
@@ -138,11 +126,16 @@ public class ValidationManagerTest extends ControllerTest 
{
     assertEquals(offlineSegmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
 
     // Refresh the segment
+    // NOTE: In order to send the refresh message, the segment need to be in 
the ExternalView
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView externalView = _helixAdmin
+          .getResourceExternalView(getHelixClusterName(), 
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME));
+      return externalView != null && 
externalView.getPartitionSet().contains(TEST_SEGMENT_NAME);
+    }, 30_000L, "Failed to find the segment in the ExternalView");
     
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
     _helixResourceManager.refreshSegment(TEST_TABLE_NAME, segmentMetadata, 
offlineSegmentZKMetadata);
 
-    offlineSegmentZKMetadata =
-        _helixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, 
TEST_SEGMENT_NAME);
+    offlineSegmentZKMetadata = 
_helixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, 
TEST_SEGMENT_NAME);
     // Check that the segment still has the same push time
     assertEquals(offlineSegmentZKMetadata.getPushTime(), pushTime);
     // Check that the refresh time is in the last 30 seconds
@@ -186,13 +179,6 @@ public class ValidationManagerTest extends ControllerTest {
         15);
   }
 
-  @AfterClass
-  public void shutDown() {
-    _helixResourceManager.stop();
-    _zkClient.close();
-    ZkStarter.stopLocalZkServer(_zookeeperInstance);
-  }
-
   @Test
   public void testComputeNumMissingSegments() {
     Interval jan1st = new Interval(new DateTime(2015, 1, 1, 0, 0, 0), new 
DateTime(2015, 1, 1, 23, 59, 59));
@@ -226,4 +212,11 @@ public class ValidationManagerTest extends ControllerTest {
     jan1st2nd4th5th.add(jan5th);
     
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th,
 Duration.standardDays(1)), 1);
   }
+
+  @AfterClass
+  public void tearDown() {
+    stopFakeInstances();
+    stopController();
+    stopZk();
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 6bb7acd..c5e93bf 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -43,8 +43,8 @@ import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 
@@ -84,7 +84,6 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   private Connection _h2Connection;
   private QueryGenerator _queryGenerator;
 
-
   /**
    * The following getters can be overridden to change default settings.
    */
@@ -324,9 +323,8 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
       @Override
       public void run() {
         try {
-          ClusterIntegrationTestUtils
-              .pushAvroIntoKafka(avroFiles, 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
-                  getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn());
+          ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
+              getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn());
         } catch (Exception e) {
           // Ignored
         }
@@ -336,8 +334,8 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
 
   protected void startKafka() {
 
-    _kafkaStarters =
-        KafkaStarterUtils.startServers(getNumKafkaBrokers(), 
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
+    _kafkaStarters = KafkaStarterUtils
+        .startServers(getNumKafkaBrokers(), 
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
             KafkaStarterUtils.getDefaultKafkaConfiguration());
     _kafkaStarters.get(0)
         .createTopic(getKafkaTopic(), 
KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
@@ -423,14 +421,14 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
 
     addRealtimeTable(getTableName(), useLlc(), 
KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, 
timeColumnName, timeType, schemaName,
-        getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(),
-        getInvertedIndexColumns(), getBloomFilterIndexColumns(), 
getRawIndexColumns(), getTaskConfig(),
-        getStreamConsumerFactoryClassName());
+        getBrokerTenant(), getServerTenant(), getLoadMode(), 
getSortedColumn(), getInvertedIndexColumns(),
+        getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(), 
getStreamConsumerFactoryClassName());
 
     completeTableConfiguration();
   }
 
-  protected void completeTableConfiguration() throws IOException {
+  protected void completeTableConfiguration()
+      throws IOException {
     if (isUsingNewConfigFormat()) {
       CombinedConfig combinedConfig = new CombinedConfig(_offlineTableConfig, 
_realtimeTableConfig, _schema);
       sendPostRequest(_controllerRequestURLBuilder.forNewTableCreate(), 
Serializer.serializeToString(combinedConfig));
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 857a16d..83b46ba 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -141,7 +141,6 @@ public abstract class BaseClusterIntegrationTestSet extends 
BaseClusterIntegrati
             + "HAVING SUM(ArrDelay) <> 6325.973 AND AVG(CAST(CRSDepTime AS 
DOUBLE)) <= 1569.8755 OR SUM(TaxiIn) = 1003.87274"));
   }
 
-
   /**
    * Test hardcoded queries.
    * <p>NOTE:
@@ -361,14 +360,14 @@ public abstract class BaseClusterIntegrationTestSet 
extends BaseClusterIntegrati
    */
   public void testInstanceShutdown()
       throws Exception {
-    List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
+    List<String> instances = 
_helixAdmin.getInstancesInCluster(getHelixClusterName());
     Assert.assertFalse(instances.isEmpty(), "List of instances should not be 
empty");
 
     // Mark all instances in the cluster as shutting down
     for (String instance : instances) {
-      InstanceConfig instanceConfig = 
_helixAdmin.getInstanceConfig(_clusterName, instance);
+      InstanceConfig instanceConfig = 
_helixAdmin.getInstanceConfig(getHelixClusterName(), instance);
       
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
 true);
-      _helixAdmin.setInstanceConfig(_clusterName, instance, instanceConfig);
+      _helixAdmin.setInstanceConfig(getHelixClusterName(), instance, 
instanceConfig);
     }
 
     // Check that the routing table is empty
@@ -376,9 +375,9 @@ public abstract class BaseClusterIntegrationTestSet extends 
BaseClusterIntegrati
 
     // Mark all instances as not shutting down
     for (String instance : instances) {
-      InstanceConfig instanceConfig = 
_helixAdmin.getInstanceConfig(_clusterName, instance);
+      InstanceConfig instanceConfig = 
_helixAdmin.getInstanceConfig(getHelixClusterName(), instance);
       
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
 false);
-      _helixAdmin.setInstanceConfig(_clusterName, instance, instanceConfig);
+      _helixAdmin.setInstanceConfig(getHelixClusterName(), instance, 
instanceConfig);
     }
 
     // Check that the routing table is not empty
@@ -394,16 +393,16 @@ public abstract class BaseClusterIntegrationTestSet 
extends BaseClusterIntegrati
       checkForInstanceInRoutingTable(true, instanceName);
 
       // Mark the server instance as shutting down
-      InstanceConfig instanceConfig = 
_helixAdmin.getInstanceConfig(_clusterName, instanceName);
+      InstanceConfig instanceConfig = 
_helixAdmin.getInstanceConfig(getHelixClusterName(), instanceName);
       
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
 true);
-      _helixAdmin.setInstanceConfig(_clusterName, instanceName, 
instanceConfig);
+      _helixAdmin.setInstanceConfig(getHelixClusterName(), instanceName, 
instanceConfig);
 
       // Check that it is not in the routing table
       checkForInstanceInRoutingTable(false, instanceName);
 
       // Re-enable the server instance
       
instanceConfig.getRecord().setBooleanField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS,
 false);
-      _helixAdmin.setInstanceConfig(_clusterName, instanceName, 
instanceConfig);
+      _helixAdmin.setInstanceConfig(getHelixClusterName(), instanceName, 
instanceConfig);
 
       // Check that it is in the routing table
       checkForInstanceInRoutingTable(true, instanceName);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 60c6c64..a9dbd1d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -60,7 +60,6 @@ import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
@@ -88,7 +87,6 @@ public abstract class ClusterTest extends ControllerTest {
   private static final Random RANDOM = new Random();
   private static final int DEFAULT_BROKER_PORT = 18099;
 
-  protected final String _clusterName = getHelixClusterName();
   protected String _brokerBaseApiUrl;
 
   private List<HelixBrokerStarter> _brokerStarters = new ArrayList<>();
@@ -129,7 +127,7 @@ public abstract class ClusterTest extends ControllerTest {
         brokerConf.setProperty(Broker.CONFIG_OF_REQUEST_HANDLER_TYPE, 
Broker.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
       }
       overrideBrokerConf(brokerConf);
-      HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf, 
_clusterName, zkStr);
+      HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf, 
getHelixClusterName(), zkStr, LOCAL_HOST);
       brokerStarter.start();
       _brokerStarters.add(brokerStarter);
     }
@@ -171,7 +169,7 @@ public abstract class ClusterTest extends ControllerTest {
             .setProperty(Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR, 
Server.DEFAULT_INSTANCE_SEGMENT_TAR_DIR + "-" + i);
         configuration.setProperty(Server.CONFIG_OF_ADMIN_API_PORT, 
baseAdminApiPort - i);
         configuration.setProperty(Server.CONFIG_OF_NETTY_PORT, baseNettyPort + 
i);
-        _serverStarters.add(new HelixServerStarter(_clusterName, zkStr, 
configuration));
+        _serverStarters.add(new HelixServerStarter(getHelixClusterName(), 
zkStr, configuration));
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -191,7 +189,7 @@ public abstract class ClusterTest extends ControllerTest {
         config.setProperty(Helix.Instance.INSTANCE_ID_KEY,
             Minion.INSTANCE_PREFIX + "minion" + i + "_" + 
(Minion.DEFAULT_HELIX_PORT + i));
         config.setProperty(Helix.Instance.DATA_DIR_KEY, 
Minion.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
-        MinionStarter minionStarter = new 
MinionStarter(ZkStarter.DEFAULT_ZK_STR, _clusterName, config);
+        MinionStarter minionStarter = new 
MinionStarter(ZkStarter.DEFAULT_ZK_STR, getHelixClusterName(), config);
 
         // Register task executor factories
         if (taskExecutorFactoryRegistry != null) {
@@ -499,20 +497,6 @@ public abstract class ClusterTest extends ControllerTest {
         bloomFilterColumns, noDictionaryColumns, taskConfig, 
streamConsumerFactoryName);
   }
 
-  protected void createBrokerTenant(String tenantName, int brokerCount)
-      throws Exception {
-    String request = 
ControllerRequestBuilderUtil.buildBrokerTenantCreateRequestJSON(tenantName, 
brokerCount);
-    sendPostRequest(_controllerRequestURLBuilder.forBrokerTenantCreate(), 
request);
-  }
-
-  protected void createServerTenant(String tenantName, int offlineServerCount, 
int realtimeServerCount)
-      throws Exception {
-    String request = ControllerRequestBuilderUtil
-        .buildServerTenantCreateRequestJSON(tenantName, offlineServerCount + 
realtimeServerCount, offlineServerCount,
-            realtimeServerCount);
-    sendPostRequest(_controllerRequestURLBuilder.forServerTenantCreate(), 
request);
-  }
-
   protected JsonNode getDebugInfo(final String uri)
       throws Exception {
     return JsonUtils.stringToJsonNode(sendGetRequest(_brokerBaseApiUrl + "/" + 
uri));
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index 64e2470..106ad78 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -90,15 +90,19 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
     public long getRealtimeSegmentValidationManagerInitialDelaySeconds() {
       return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
     }
+
     public long getStatusCheckerInitialDelayInSeconds() {
       return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
     }
+
     public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
       return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
     }
+
     public long getBrokerResourceValidationInitialDelayInSeconds() {
       return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
     }
+
     public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
       return PERIODIC_TASK_INITIAL_DELAY_SECONDS;
     }
@@ -119,7 +123,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
    * @throws Exception
    */
   @BeforeClass
-  public void setUp() throws Exception {
+  public void setUp()
+      throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
 
     startZk();
@@ -159,7 +164,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
   /**
    * Setup offline table, but no segments
    */
-  private void setupOfflineTable(String table) throws Exception {
+  private void setupOfflineTable(String table)
+      throws Exception {
     _realtimeTableConfig = null;
     addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, 
SegmentVersion.v1, null, null, null, null, null);
     completeTableConfiguration();
@@ -168,7 +174,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
   /**
    * Setup offline table, with segments from avro
    */
-  private void setupOfflineTableAndSegments(String tableName, List<File> 
avroFiles) throws Exception {
+  private void setupOfflineTableAndSegments(String tableName, List<File> 
avroFiles)
+      throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
     setTableName(tableName);
     _realtimeTableConfig = null;
@@ -184,12 +191,13 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
     Assert.assertNotNull(outgoingTimeUnit);
     String timeType = outgoingTimeUnit.toString();
 
-    addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, 
TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
+    addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, 
TENANT_NAME, null, SegmentVersion.v1, null, null,
+        null, null, null);
     completeTableConfiguration();
 
     ExecutorService executor = Executors.newCachedThreadPool();
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0, 
_segmentDir, _tarDir, tableName, false,
-        null, null, null, executor);
+    ClusterIntegrationTestUtils
+        .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, 
false, null, null, null, executor);
     executor.shutdown();
     executor.awaitTermination(10, TimeUnit.MINUTES);
     uploadSegments(getTableName(), _tarDir);
@@ -199,7 +207,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
   /**
    * Setup realtime table for given tablename and topic
    */
-  private void setupRealtimeTable(String table, String  topic, File avroFile) 
throws Exception {
+  private void setupRealtimeTable(String table, String topic, File avroFile)
+      throws Exception {
     _offlineTableConfig = null;
     File schemaFile = getSchemaFile();
     Schema schema = Schema.fromFile(schemaFile);
@@ -233,7 +242,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
    * @throws Exception
    */
   @BeforeGroups(groups = "segmentStatusChecker")
-  public void beforeTestSegmentStatusCheckerTest(ITestContext context) throws 
Exception {
+  public void beforeTestSegmentStatusCheckerTest(ITestContext context)
+      throws Exception {
     String emptyTable = "table1_OFFLINE";
     String disabledOfflineTable = "table2_OFFLINE";
     String basicOfflineTable = getDefaultOfflineTableName();
@@ -253,7 +263,7 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
 
     // table with disabled ideal state
     setupOfflineTable(disabledOfflineTable);
-    _helixAdmin.enableResource(_clusterName, disabledOfflineTable, false);
+    _helixAdmin.enableResource(getHelixClusterName(), disabledOfflineTable, 
false);
 
     // some segments offline
     setupOfflineTableAndSegments(errorOfflineTable, _avroFiles);
@@ -282,7 +292,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
    * Validate that we are seeing the expected numbers
    */
   @Test(groups = "segmentStatusChecker")
-  public void testSegmentStatusChecker(ITestContext context) throws Exception {
+  public void testSegmentStatusChecker(ITestContext context)
+      throws Exception {
     String emptyTable = (String) context.getAttribute("emptyTable");
     String disabledOfflineTable = (String) 
context.getAttribute("disabledOfflineTable");
     String basicOfflineTable = (String) 
context.getAttribute("basicOfflineTable");
@@ -292,9 +303,9 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
 
     ControllerMetrics controllerMetrics = 
_controllerStarter.getControllerMetrics();
 
-    TestUtils.waitForCondition(input ->
-        
controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
-            "SegmentStatusChecker") >= numTables, 240_000, "Timed out waiting 
for SegmentStatusChecker");
+    TestUtils.waitForCondition(input -> controllerMetrics
+            
.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, 
"SegmentStatusChecker") >= numTables,
+        240_000, "Timed out waiting for SegmentStatusChecker");
 
     // empty table - table1_OFFLINE
     // num replicas set from ideal state
@@ -344,7 +355,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
   }
 
   @AfterGroups(groups = "segmentStatusChecker")
-  public void afterTestSegmentStatusChecker(ITestContext context) throws 
Exception {
+  public void afterTestSegmentStatusChecker(ITestContext context)
+      throws Exception {
     String emptyTable = (String) context.getAttribute("emptyTable");
     String disabledOfflineTable = (String) 
context.getAttribute("disabledOfflineTable");
     String errorOfflineTable = (String) 
context.getAttribute("errorOfflineTable");
@@ -361,7 +373,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
    * @throws Exception
    */
   @BeforeGroups(groups = "realtimeSegmentRelocator", dependsOnGroups = 
"segmentStatusChecker")
-  public void beforeRealtimeSegmentRelocatorTest(ITestContext context) throws 
Exception {
+  public void beforeRealtimeSegmentRelocatorTest(ITestContext context)
+      throws Exception {
     String relocationTable = getDefaultRealtimeTableName();
     context.setAttribute("relocationTable", relocationTable);
 
@@ -377,20 +390,22 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
   }
 
   @Test(groups = "realtimeSegmentRelocator", dependsOnGroups = 
"segmentStatusChecker")
-  public void testRealtimeSegmentRelocator(ITestContext context) throws 
Exception {
+  public void testRealtimeSegmentRelocator(ITestContext context)
+      throws Exception {
 
     String relocationTable = (String) context.getAttribute("relocationTable");
 
     ControllerMetrics controllerMetrics = 
_controllerStarter.getControllerMetrics();
 
-    long taskRunCount = 
controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator",
-        ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
+    long taskRunCount =
+        controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
+            .count();
     TestUtils.waitForCondition(input ->
         controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
             .count() > taskRunCount, 60_000, "Timed out waiting for 
RealtimeSegmentRelocation to run");
 
-    
Assert.assertTrue(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
-        "RealtimeSegmentRelocator") > 0);
+    Assert.assertTrue(controllerMetrics
+        
.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, 
"RealtimeSegmentRelocator") > 0);
 
     // check servers for ONLINE segment and CONSUMING segments are disjoint 
sets
     Set<String> consuming = new HashSet<>();
@@ -425,7 +440,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
     // Check that the first table we added doesn't need to be rebuilt(case 
where ideal state brokers and brokers in broker resource are the same.
     String table1 = (String) context.getAttribute("testTableOne");
     String table2 = (String) context.getAttribute("testTableTwo");
-    TableConfig tableConfigOne = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(table1).build();
+    TableConfig tableConfigOne =
+        new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(table1).build();
     String partitionNameOne = tableConfigOne.getTableName();
 
     // Ensure that the broker resource is not rebuilt.
@@ -517,27 +533,29 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
    * @throws Exception
    */
 
-  @Test(groups="realtimeSegmentValidationManager", dependsOnGroups = 
"offlineSegmentIntervalChecker")
-  public void testRealtimeSegmentValidationManager(ITestContext context) 
throws Exception {
+  @Test(groups = "realtimeSegmentValidationManager", dependsOnGroups = 
"offlineSegmentIntervalChecker")
+  public void testRealtimeSegmentValidationManager(ITestContext context)
+      throws Exception {
     ControllerMetrics controllerMetrics = 
_controllerStarter.getControllerMetrics();
-    long taskRunCount = 
controllerMetrics.getMeteredTableValue("RealtimeSegmentValidationManager",
-        ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
+    long taskRunCount = controllerMetrics
+        .getMeteredTableValue("RealtimeSegmentValidationManager", 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
 
     // Wait until the RealtimeSegmentValidationManager runs at least once. 
Most likely it already ran once
     // on the realtime table (default one) already setup, so we should have 
the total document count on that
     // realtime table.
-    TestUtils.waitForCondition(input ->
-        
controllerMetrics.getMeteredTableValue("RealtimeSegmentValidationManager", 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
-            .count() > taskRunCount, 60_000, "Timed out waiting for 
RealtimeSegmentValidationManager to run");
+    TestUtils.waitForCondition(input -> controllerMetrics
+        .getMeteredTableValue("RealtimeSegmentValidationManager", 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count()
+        > taskRunCount, 60_000, "Timed out waiting for 
RealtimeSegmentValidationManager to run");
 
-    
Assert.assertTrue(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
-        "RealtimeSegmentValidationManager") > 0);
+    Assert.assertTrue(controllerMetrics
+        
.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, 
"RealtimeSegmentValidationManager")
+        > 0);
     RealtimeSegmentValidationManager validationManager = 
_controllerStarter.getRealtimeSegmentValidationManager();
     ValidationMetrics validationMetrics = 
validationManager.getValidationMetrics();
     // Make sure we processed the realtime table to get the total document 
count. Should have been done the first
     // time RealtimeSegmentValidationManager ran on the default realtime table.
-    
Assert.assertTrue(validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(getDefaultRealtimeTableName(),
-        "TotalDocumentCount")) > 0);
+    Assert.assertTrue(validationMetrics
+        
.getValueOfGauge(ValidationMetrics.makeGaugeName(getDefaultRealtimeTableName(), 
"TotalDocumentCount")) > 0);
   }
 
   // TODO: tests for other ControllerPeriodicTasks (RetentionManagert , 
RealtimeSegmentValidationManager)
@@ -565,7 +583,8 @@ public class ControllerPeriodicTasksIntegrationTests 
extends BaseClusterIntegrat
    * @throws Exception
    */
   @AfterClass
-  public void tearDown() throws Exception {
+  public void tearDown()
+      throws Exception {
     stopServer();
     stopBroker();
     stopController();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index dce6a5e..714986c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -136,17 +136,17 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   private void registerCallbackHandlers() {
-    List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
+    List<String> instances = 
_helixAdmin.getInstancesInCluster(getHelixClusterName());
     instances.removeIf(instance -> 
(!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && 
!instance
         .startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)));
-    List<String> resourcesInCluster = 
_helixAdmin.getResourcesInCluster(_clusterName);
+    List<String> resourcesInCluster = 
_helixAdmin.getResourcesInCluster(getHelixClusterName());
     resourcesInCluster.removeIf(
         resource -> (!TableNameBuilder.isTableResource(resource) && 
!CommonConstants.Helix.BROKER_RESOURCE_INSTANCE
             .equals(resource)));
     for (String instance : instances) {
       List<String> resourcesToMonitor = new ArrayList<>();
       for (String resourceName : resourcesInCluster) {
-        IdealState idealState = 
_helixAdmin.getResourceIdealState(_clusterName, resourceName);
+        IdealState idealState = 
_helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName);
         for (String partitionName : idealState.getPartitionSet()) {
           if (idealState.getInstanceSet(partitionName).contains(instance)) {
             resourcesToMonitor.add(resourceName);
@@ -155,10 +155,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         }
       }
       _serviceStatusCallbacks.add(new 
ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
-          .of(new 
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager,
 _clusterName,
-                  instance, resourcesToMonitor, 100.0),
-              new 
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager,
 _clusterName,
-                  instance, resourcesToMonitor, 100.0))));
+          .of(new 
ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager,
+                  getHelixClusterName(), instance, resourcesToMonitor, 100.0),
+              new 
ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager,
+                  getHelixClusterName(), instance, resourcesToMonitor, 
100.0))));
     }
   }
 
@@ -741,11 +741,11 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // TODO: Add test to delete broker instance. Currently, stopBroker() does 
not work correctly.
 
     // Check if '/INSTANCES/<serverName>' has been erased correctly
-    String instancePath = "/" + _clusterName + "/INSTANCES/" + serverName;
+    String instancePath = "/" + getHelixClusterName() + "/INSTANCES/" + 
serverName;
     assertFalse(_propertyStore.exists(instancePath, 0));
 
     // Check if '/CONFIGS/PARTICIPANT/<serverName>' has been erased correctly
-    String configPath = "/" + _clusterName + "/CONFIGS/PARTICIPANT/" + 
serverName;
+    String configPath = "/" + getHelixClusterName() + "/CONFIGS/PARTICIPANT/" 
+ serverName;
     assertFalse(_propertyStore.exists(configPath, 0));
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
index 560bf5a..84ea948 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
@@ -97,14 +97,14 @@ public class SegmentCompletionIntegrationTest extends 
BaseClusterIntegrationTest
 
     // Create server instance with the fake server state model
     _serverHelixManager = HelixManagerFactory
-        .getZKHelixManager(_clusterName, _serverInstance, 
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
+        .getZKHelixManager(getHelixClusterName(), _serverInstance, 
InstanceType.PARTICIPANT, ZkStarter.DEFAULT_ZK_STR);
     _serverHelixManager.getStateMachineEngine()
         
.registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(),
             new FakeServerSegmentStateModelFactory());
     _serverHelixManager.connect();
 
     // Add Helix tag to the server
-    _serverHelixManager.getClusterManagmentTool().addInstanceTag(_clusterName, 
_serverInstance,
+    
_serverHelixManager.getClusterManagmentTool().addInstanceTag(getHelixClusterName(),
 _serverInstance,
         
TableNameBuilder.REALTIME.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
 
     // Initialize controller leader locator
@@ -127,7 +127,7 @@ public class SegmentCompletionIntegrationTest extends 
BaseClusterIntegrationTest
       @Override
       public Boolean apply(@Nullable Void aVoid) {
         try {
-          ExternalView externalView = 
_helixAdmin.getResourceExternalView(_clusterName, realtimeTableName);
+          ExternalView externalView = 
_helixAdmin.getResourceExternalView(getHelixClusterName(), realtimeTableName);
           Map<String, String> stateMap = 
externalView.getStateMap(_currentSegment);
           return stateMap.get(_serverInstance)
               
.equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE);
@@ -152,7 +152,7 @@ public class SegmentCompletionIntegrationTest extends 
BaseClusterIntegrationTest
       @Override
       public Boolean apply(@Nullable Void aVoid) {
         try {
-          ExternalView externalView = 
_helixAdmin.getResourceExternalView(_clusterName, realtimeTableName);
+          ExternalView externalView = 
_helixAdmin.getResourceExternalView(getHelixClusterName(), realtimeTableName);
           Map<String, String> stateMap = 
externalView.getStateMap(_currentSegment);
           return 
stateMap.get(_serverInstance).equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.OFFLINE_STATE);
         } catch (Exception e) {
@@ -175,7 +175,7 @@ public class SegmentCompletionIntegrationTest extends 
BaseClusterIntegrationTest
       public Boolean apply(@Nullable Void aVoid) {
         try {
           if (!_currentSegment.equals(oldSegment)) {
-            ExternalView externalView = 
_helixAdmin.getResourceExternalView(_clusterName, realtimeTableName);
+            ExternalView externalView = 
_helixAdmin.getResourceExternalView(getHelixClusterName(), realtimeTableName);
             Map<String, String> stateMap = 
externalView.getStateMap(_currentSegment);
             return
                 
stateMap.get(_serverInstance).equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
index 6fd2ec6..e69f275 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java
@@ -300,10 +300,8 @@ public class HelixServerStarter {
     List<String> instanceTags = instanceConfig.getTags();
     if (instanceTags == null || instanceTags.size() == 0) {
       if 
(ZKMetadataProvider.getClusterTenantIsolationEnabled(_helixManager.getHelixPropertyStore()))
 {
-        _helixAdmin.addInstanceTag(clusterName, instanceName,
-            
TableNameBuilder.OFFLINE.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
-        _helixAdmin.addInstanceTag(clusterName, instanceName,
-            
TableNameBuilder.REALTIME.tableNameWithType(TagNameUtils.DEFAULT_TENANT_NAME));
+        _helixAdmin.addInstanceTag(clusterName, instanceName, 
TagNameUtils.getOfflineTagForTenant(null));
+        _helixAdmin.addInstanceTag(clusterName, instanceName, 
TagNameUtils.getRealtimeTagForTenant(null));
       } else {
         _helixAdmin.addInstanceTag(clusterName, instanceName, 
UNTAGGED_SERVER_INSTANCE);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to