This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e32a5b1  Add ControllerPeriodicTask integration tests (#4130)
e32a5b1 is described below

commit e32a5b1634f221e31bd8dc49a2284b27a1bb4221
Author: Neha Pawar <[email protected]>
AuthorDate: Wed Apr 17 12:56:07 2019 -0700

    Add ControllerPeriodicTask integration tests (#4130)
    
    Refactor SegmentStatusCheckerIntegrationTest to be able to handle all 
daemons in same test
    Add integration test for RealtimeSegmentRelocator periodic task
---
 .../apache/pinot/controller/ControllerConf.java    |  12 +
 .../core/relocation/RealtimeSegmentRelocator.java  |   2 +-
 .../tests/BaseClusterIntegrationTest.java          |  15 +
 .../pinot/integration/tests/ClusterTest.java       |  10 +
 .../ControllerPeriodicTasksIntegrationTests.java   | 422 +++++++++++++++++++++
 .../tests/HybridClusterIntegrationTest.java        |  10 +-
 .../tests/RealtimeClusterIntegrationTest.java      |  12 +-
 .../tasks/SegmentStatusCheckerIntegrationTest.java | 288 --------------
 8 files changed, 463 insertions(+), 308 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 1faa949..2372ac0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -96,6 +96,8 @@ public class ControllerConf extends PropertiesConfiguration {
         "controller.retentionManager.initialDelayInSeconds";
     private static final String 
OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS =
         "controller.offlineSegmentIntervalChecker.initialDelayInSeconds";
+    private static final String 
REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS =
+        "controller.realtimeSegmentRelocation.initialDelayInSeconds";
 
     public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
     public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
@@ -593,6 +595,11 @@ public class ControllerConf extends 
PropertiesConfiguration {
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
   }
 
+  public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
+    return 
getLong(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS,
+        ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+  }
+
   public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
     return 
getLong(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS,
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -602,6 +609,11 @@ public class ControllerConf extends 
PropertiesConfiguration {
     
setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
 initialDelayInSeconds);
   }
 
+  public void setRealtimeSegmentRelocationInitialDelayInSeconds(long 
initialDelayInSeconds) {
+    
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS,
+        initialDelayInSeconds);
+  }
+
   public long getPeriodicTaskInitialDelayInSeconds() {
     return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 0a9e352..ea5b05b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,7 +58,7 @@ public class RealtimeSegmentRelocator extends 
ControllerPeriodicTask<Void> {
   public RealtimeSegmentRelocator(PinotHelixResourceManager 
pinotHelixResourceManager, ControllerConf config,
       ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentRelocator", 
getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
-        config.getPeriodicTaskInitialDelayInSeconds(), 
pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), 
pinotHelixResourceManager, controllerMetrics);
   }
 
   @Override
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 36e67ab..fed0b68 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.concurrent.Executor;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
 import org.apache.pinot.common.config.TableTaskConfig;
@@ -68,6 +69,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected final File _avroDir = new File(_tempDir, "avroDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
+  protected List<KafkaServerStartable> _kafkaStarters;
 
   private org.apache.pinot.client.Connection _pinotConnection;
   private Connection _h2Connection;
@@ -311,6 +313,19 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     });
   }
 
+  protected void startKafka() {
+    _kafkaStarters = KafkaStarterUtils
+        .startServers(getNumKafkaBrokers(), 
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
+            KafkaStarterUtils.getDefaultKafkaConfiguration());
+    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
+  }
+
+  protected void stopKafka() {
+    for (KafkaServerStartable kafkaStarter : _kafkaStarters) {
+      KafkaStarterUtils.stopServer(kafkaStarter);
+    }
+  }
+
   /**
    * Get current result for "SELECT COUNT(*)".
    *
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 5d9ba61..d127779 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -50,6 +50,7 @@ import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TableTaskConfig;
+import org.apache.pinot.common.config.TenantConfig;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.CommonConstants.Broker;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
@@ -463,6 +464,15 @@ public abstract class ClusterTest extends ControllerTest {
         _realtimeTableConfig.toJsonConfigString());
   }
 
+  protected void updateRealtimeTableTenant(String tableName, TenantConfig 
tenantConfig)
+      throws Exception {
+
+    _realtimeTableConfig.setTenantConfig(tenantConfig);
+
+    
sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName),
+        _realtimeTableConfig.toJsonConfigString());
+  }
+
   protected void dropRealtimeTable(String tableName)
       throws Exception {
     sendDeleteRequest(
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
new file mode 100644
index 0000000..9535cf6
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import kafka.server.KafkaServerStartable;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagOverrideConfig;
+import org.apache.pinot.common.config.TenantConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.KafkaStarterUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterGroups;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeGroups;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration tests for all {@link 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask}
+ * The intention of these tests is not to test functionality of daemons,
+ * but simply to check that they run as expected and process the tables when 
the controller starts.
+ *
+ * Cluster setup/teardown is common across all tests in the @BeforeClass 
method {@link ControllerPeriodicTasksIntegrationTests::setup}.
+ * This includes:
+ * zk, controller, 1 broker, 3 offline servers, 3 realtime servers, kafka with 
avro loaded, offline table with segments from avro
+ *
+ * There will be a separate beforeTask(), testTask() and afterTask() for each 
ControllerPeriodicTask test, grouped by task name.
+ * See group = "segmentStatusChecker" for example.
+ * The tables needed for the test will be created in beforeTask(), and dropped 
in afterTask()
+ *
+ * The groups run sequentially in the order: segmentStatusChecker -> 
realtimeSegmentRelocation -> ....
+ */
+public class ControllerPeriodicTasksIntegrationTests extends 
BaseClusterIntegrationTestSet {
+
+  private static final String TENANT_NAME = "TestTenant";
+  private static final String DEFAULT_TABLE_NAME = "mytable";
+
+  private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 60;
+  private static final int PERIODIC_TASK_FREQ_SECONDS = 5;
+  private static final String PERIODIC_TASK_FREQ = "5s";
+
+  private String _currentTableName;
+  private List<File> _avroFiles;
+
+  /**
+   * Setup the cluster for the tests
+   * @throws Exception
+   */
+  @BeforeClass
+  public void setUp() throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    startZk();
+    startKafka();
+
+    // Set initial delay of 60 seconds for periodic tasks, to allow time for 
tables setup.
+    // Run at 5 seconds freq in order to keep them running, in case first run 
happens before table setup
+    ControllerConf controllerConf = getDefaultControllerConfiguration();
+    controllerConf.setTenantIsolationEnabled(false);
+    
controllerConf.setStatusCheckerInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
+    
controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
+    
controllerConf.setRealtimeSegmentRelocationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
+    controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
+
+    startController(controllerConf);
+    startBroker();
+    startServers(6);
+
+    // Create tenants
+    createBrokerTenant(TENANT_NAME, 1);
+    createServerTenant(TENANT_NAME, 3, 3);
+
+    // unpack avro into _tempDir
+    _avroFiles = unpackAvroData(_tempDir);
+
+    // setup a default offline table, shared across all tests. Each test can 
create additional tables and destroy them
+    setupOfflineTableAndSegments(DEFAULT_TABLE_NAME, _avroFiles);
+
+    // push avro into kafka, each test can create the realtime table and 
destroy it
+    ExecutorService executor = Executors.newCachedThreadPool();
+    pushAvroIntoKafka(_avroFiles, getKafkaTopic(), executor);
+    executor.shutdown();
+    executor.awaitTermination(10, TimeUnit.MINUTES);
+  }
+
+  /**
+   * Setup offline table, but no segments
+   */
+  private void setupOfflineTable(String table) throws Exception {
+    _realtimeTableConfig = null;
+    addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, 
SegmentVersion.v1, null, null, null);
+    completeTableConfiguration();
+  }
+
+  /**
+   * Setup offline table, with segments from avro
+   */
+  private void setupOfflineTableAndSegments(String table, List<File> 
avroFiles) throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+    setTableName(table);
+    _realtimeTableConfig = null;
+
+    File schemaFile = getSchemaFile();
+    Schema schema = Schema.fromFile(schemaFile);
+    String schemaName = schema.getSchemaName();
+    addSchema(schemaFile, schemaName);
+
+    String timeColumnName = schema.getTimeColumnName();
+    Assert.assertNotNull(timeColumnName);
+    TimeUnit outgoingTimeUnit = schema.getOutgoingTimeUnit();
+    Assert.assertNotNull(outgoingTimeUnit);
+    String timeType = outgoingTimeUnit.toString();
+
+    addOfflineTable(table, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, 
null, SegmentVersion.v1, null, null, null);
+    completeTableConfiguration();
+
+    ExecutorService executor = Executors.newCachedThreadPool();
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0, 
_segmentDir, _tarDir, table, false,
+        null, null, null, executor);
+    executor.shutdown();
+    executor.awaitTermination(10, TimeUnit.MINUTES);
+    uploadSegments(_tarDir);
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  /**
+   * Setup realtime table for given tablename and topic
+   */
+  private void setupRealtimeTable(String table, String  topic, File avroFile) 
throws Exception {
+    _offlineTableConfig = null;
+    File schemaFile = getSchemaFile();
+    Schema schema = Schema.fromFile(schemaFile);
+    String schemaName = schema.getSchemaName();
+    addSchema(schemaFile, schemaName);
+
+    String timeColumnName = schema.getTimeColumnName();
+    Assert.assertNotNull(timeColumnName);
+    TimeUnit outgoingTimeUnit = schema.getOutgoingTimeUnit();
+    Assert.assertNotNull(outgoingTimeUnit);
+    String timeType = outgoingTimeUnit.toString();
+
+    addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, 
KafkaStarterUtils.DEFAULT_ZK_STR, topic,
+        getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, 
schemaName, TENANT_NAME, TENANT_NAME,
+        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), 
getBloomFilterIndexColumns(), getRawIndexColumns(),
+        getTaskConfig(), getStreamConsumerFactoryClassName());
+    completeTableConfiguration();
+  }
+
+  @Override
+  public String getTableName() {
+    return _currentTableName;
+  }
+
+  private void setTableName(String tableName) {
+    _currentTableName = tableName;
+  }
+
+  /**
+   * Group - segmentStatusChecker - Integration tests for {@link 
org.apache.pinot.controller.helix.SegmentStatusChecker}
+   * @throws Exception
+   */
+  @BeforeGroups(groups = "segmentStatusChecker")
+  public void beforeTestSegmentStatusCheckerTest(ITestContext context) throws 
Exception {
+    String emptyTable = "table1_OFFLINE";
+    String disabledOfflineTable = "table2_OFFLINE";
+    String basicOfflineTable = getDefaultOfflineTableName();
+    String errorOfflineTable = "table4_OFFLINE";
+    String basicRealtimeTable = getDefaultRealtimeTableName();
+    int numTables = 5;
+
+    context.setAttribute("emptyTable", emptyTable);
+    context.setAttribute("disabledOfflineTable", disabledOfflineTable);
+    context.setAttribute("basicOfflineTable", basicOfflineTable);
+    context.setAttribute("errorOfflineTable", errorOfflineTable);
+    context.setAttribute("basicRealtimeTable", basicRealtimeTable);
+    context.setAttribute("numTables", numTables);
+
+    // empty table
+    setupOfflineTable(emptyTable);
+
+    // table with disabled ideal state
+    setupOfflineTable(disabledOfflineTable);
+    _helixAdmin.enableResource(_clusterName, disabledOfflineTable, false);
+
+    // some segments offline
+    setupOfflineTableAndSegments(errorOfflineTable, _avroFiles);
+    HelixHelper.updateIdealState(_helixManager, errorOfflineTable, new 
Function<IdealState, IdealState>() {
+      @Nullable
+      @Override
+      public IdealState apply(@Nullable IdealState input) {
+        List<String> segmentNames = 
Lists.newArrayList(input.getPartitionSet());
+        Collections.sort(segmentNames);
+
+        Map<String, String> instanceStateMap1 = 
input.getInstanceStateMap(segmentNames.get(0));
+        for (String instance : instanceStateMap1.keySet()) {
+          instanceStateMap1.put(instance, "OFFLINE");
+          break;
+        }
+        return input;
+      }
+    }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
+
+    // setup default realtime table
+    setupRealtimeTable(basicRealtimeTable, getKafkaTopic(), _avroFiles.get(0));
+  }
+
+  /**
+   * After 1 run of SegmentStatusChecker the controllerMetrics will be set for 
each table
+   * Validate that we are seeing the expected numbers
+   */
+  @Test(groups = "segmentStatusChecker")
+  public void testSegmentStatusChecker(ITestContext context) throws Exception {
+    String emptyTable = (String) context.getAttribute("emptyTable");
+    String disabledOfflineTable = (String) 
context.getAttribute("disabledOfflineTable");
+    String basicOfflineTable = (String) 
context.getAttribute("basicOfflineTable");
+    String errorOfflineTable = (String) 
context.getAttribute("errorOfflineTable");
+    String basicRealtimeTable = (String) 
context.getAttribute("basicRealtimeTable");
+    int numTables = (int) context.getAttribute("numTables");
+
+    ControllerMetrics controllerMetrics = 
_controllerStarter.getControllerMetrics();
+
+    TestUtils.waitForCondition(input ->
+        
controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
+            "SegmentStatusChecker") >= numTables, 240_000, "Timed out waiting 
for SegmentStatusChecker");
+
+    // empty table - table1_OFFLINE
+    // num replicas set from ideal state
+    checkSegmentStatusCheckerMetrics(controllerMetrics, emptyTable, null, 3, 
100, 0, 100);
+
+    // disabled table - table2_OFFLINE
+    // reset to defaults
+    checkSegmentStatusCheckerMetrics(controllerMetrics, disabledOfflineTable, 
null, Long.MIN_VALUE, Long.MIN_VALUE,
+        Long.MIN_VALUE, Long.MIN_VALUE);
+
+    // happy path table - mytable_OFFLINE
+    IdealState idealState = 
_helixResourceManager.getTableIdealState(basicOfflineTable);
+    checkSegmentStatusCheckerMetrics(controllerMetrics, basicOfflineTable, 
idealState, 3, 100, 0, 100);
+
+    // offline segments - table4_OFFLINE
+    // 2 replicas available out of 3, percent 66
+    idealState = _helixResourceManager.getTableIdealState(errorOfflineTable);
+    checkSegmentStatusCheckerMetrics(controllerMetrics, errorOfflineTable, 
idealState, 2, 66, 0, 100);
+
+    // happy path table - mytable_REALTIME
+    idealState = _helixResourceManager.getTableIdealState(basicRealtimeTable);
+    checkSegmentStatusCheckerMetrics(controllerMetrics, basicRealtimeTable, 
idealState, 1, 100, 0, 100);
+
+    // Total metrics
+    
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT),
 4);
+    
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT),
 1);
+    
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
 1);
+  }
+
+  private void checkSegmentStatusCheckerMetrics(ControllerMetrics 
controllerMetrics, String tableName,
+      IdealState idealState, long numReplicas, long percentReplicas, long 
segmentsInErrorState,
+      long percentSegmentsAvailable) {
+    if (idealState != null) {
+      Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE),
+          idealState.toString().length());
+      Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENT_COUNT),
+          (long) (idealState.getPartitionSet().size()));
+    }
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.NUMBER_OF_REPLICAS),
+        numReplicas);
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_OF_REPLICAS),
+        percentReplicas);
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
+        segmentsInErrorState);
+    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
+        percentSegmentsAvailable);
+  }
+
+  @AfterGroups(groups = "segmentStatusChecker")
+  public void afterTestSegmentStatusChecker(ITestContext context) throws 
Exception {
+    String emptyTable = (String) context.getAttribute("emptyTable");
+    String disabledOfflineTable = (String) 
context.getAttribute("disabledOfflineTable");
+    String errorOfflineTable = (String) 
context.getAttribute("errorOfflineTable");
+    String basicRealtimeTable = (String) 
context.getAttribute("basicRealtimeTable");
+
+    dropOfflineTable(emptyTable);
+    dropOfflineTable(disabledOfflineTable);
+    dropOfflineTable(errorOfflineTable);
+    dropOfflineTable(basicRealtimeTable);
+  }
+
+  /**
+   * Group - realtimeSegmentRelocator - Integration tests for {@link 
org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator}
+   * @param context
+   * @throws Exception
+   */
+  @BeforeGroups(groups = "realtimeSegmentRelocator", dependsOnGroups = 
"segmentStatusChecker")
+  public void beforeRealtimeSegmentRelocatorTest(ITestContext context) throws 
Exception {
+    String relocationTable = getDefaultRealtimeTableName();
+    context.setAttribute("relocationTable", relocationTable);
+
+    // setup default realtime table
+    setupRealtimeTable(relocationTable, getKafkaTopic(), _avroFiles.get(0));
+
+    // add tag override for relocation
+    TenantConfig tenantConfig = new TenantConfig();
+    tenantConfig.setServer(TENANT_NAME);
+    tenantConfig.setBroker(TENANT_NAME);
+    TagOverrideConfig tagOverrideConfig = new TagOverrideConfig();
+    tagOverrideConfig.setRealtimeConsuming(TENANT_NAME + "_REALTIME");
+    tagOverrideConfig.setRealtimeCompleted(TENANT_NAME + "_OFFLINE");
+    tenantConfig.setTagOverrideConfig(tagOverrideConfig);
+    
updateRealtimeTableTenant(TableNameBuilder.extractRawTableName(relocationTable),
 tenantConfig);
+  }
+
+  @Test(groups = "realtimeSegmentRelocator", dependsOnGroups = 
"segmentStatusChecker")
+  public void testRealtimeSegmentRelocator(ITestContext context) throws 
Exception {
+
+    String relocationTable = (String) context.getAttribute("relocationTable");
+
+    ControllerMetrics controllerMetrics = 
_controllerStarter.getControllerMetrics();
+
+    long taskRunCount = 
controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator",
+        ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
+    TestUtils.waitForCondition(input ->
+        controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator", 
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
+            .count() > taskRunCount, 60_000, "Timed out waiting for 
RealtimeSegmentRelocation to run");
+
+    
Assert.assertTrue(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
+        "RealtimeSegmentRelocator") > 0);
+
+    // check servers for ONLINE segment and CONSUMING segments are disjoint 
sets
+    Set<String> consuming = new HashSet<>();
+    Set<String> completed = new HashSet<>();
+    IdealState tableIdealState = 
_helixResourceManager.getTableIdealState(relocationTable);
+    for (String partition : tableIdealState.getPartitionSet()) {
+      Map<String, String> instanceStateMap = 
tableIdealState.getInstanceStateMap(partition);
+      if (instanceStateMap.containsValue("CONSUMING")) {
+        consuming.addAll(instanceStateMap.keySet());
+      }
+      if (instanceStateMap.containsValue("ONLINE")) {
+        completed.addAll(instanceStateMap.keySet());
+      }
+    }
+
+    Assert.assertTrue(Collections.disjoint(consuming, completed));
+  }
+
+  @AfterGroups(groups = "realtimeSegmentRelocator", dependsOnGroups = 
"segmentStatusChecker")
+  public void afterRealtimeSegmentRelocatorTest(ITestContext context) throws 
Exception {
+    String relocationTable = (String) context.getAttribute("relocationTable");
+    dropRealtimeTable(relocationTable);
+  }
+
+  // TODO: tests for other ControllerPeriodicTasks (RetentionManager, 
BrokerValidationManager, OfflineSegmentIntervalChecker, 
RealtimeSegmentValidationManager)
+
+  @Override
+  protected boolean isUsingNewConfigFormat() {
+    return true;
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  private String getDefaultOfflineTableName() {
+    return DEFAULT_TABLE_NAME + "_OFFLINE";
+  }
+
+  private String getDefaultRealtimeTableName() {
+    return DEFAULT_TABLE_NAME + "_REALTIME";
+  }
+
+  /**
+   * Tear down the cluster after tests
+   * @throws Exception
+   */
+  @AfterClass
+  public void tearDown() throws Exception {
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index c077e8a..fa88396 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -51,7 +51,6 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   private static final int NUM_OFFLINE_SEGMENTS = 8;
   private static final int NUM_REALTIME_SEGMENTS = 6;
 
-  private KafkaServerStartable _kafkaStarter;
   private Schema _schema;
 
   protected int getNumOfflineSegments() {
@@ -109,12 +108,7 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       throws Exception {
     // Start Zk and Kafka
     startZk();
-    _kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, 
KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, 
KafkaStarterUtils.getDefaultKafkaConfiguration());
-
-    // Create Kafka topic
-    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
+    startKafka();
 
     // Start the Pinot cluster
     ControllerConf config = getDefaultControllerConfiguration();
@@ -301,7 +295,7 @@ public class HybridClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     stopServer();
     stopBroker();
     stopController();
-    KafkaStarterUtils.stopServer(_kafkaStarter);
+    stopKafka();
     stopZk();
     cleanup();
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 419a3cf..37df2e4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -39,7 +39,6 @@ import org.testng.annotations.Test;
  * Integration test that creates a Kafka broker, creates a Pinot cluster that 
consumes from Kafka and queries Pinot.
  */
 public class RealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTestSet {
-  private List<KafkaServerStartable> _kafkaStarters;
 
   @BeforeClass
   public void setUp()
@@ -79,13 +78,6 @@ public class RealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTestSe
     waitForAllDocsLoaded(600_000L);
   }
 
-  protected void startKafka() {
-    _kafkaStarters = KafkaStarterUtils
-        .startServers(getNumKafkaBrokers(), 
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
-            KafkaStarterUtils.getDefaultKafkaConfiguration());
-    KafkaStarterUtils.createTopic(getKafkaTopic(), 
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
-  }
-
   protected void setUpTable(File avroFile)
       throws Exception {
     File schemaFile = getSchemaFile();
@@ -179,9 +171,7 @@ public class RealtimeClusterIntegrationTest extends 
BaseClusterIntegrationTestSe
     stopServer();
     stopBroker();
     stopController();
-    for (KafkaServerStartable kafkaStarter : _kafkaStarters) {
-      KafkaStarterUtils.stopServer(kafkaStarter);
-    }
+    stopKafka();
     stopZk();
     FileUtils.deleteDirectory(_tempDir);
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
deleted file mode 100644
index 9966840..0000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests.controller.periodic.tasks;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.model.IdealState;
-import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.utils.KafkaStarterUtils;
-import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.common.utils.retry.RetryPolicies;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
-import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-/**
- * Integration test to check {@link 
org.apache.pinot.controller.helix.SegmentStatusChecker} is
- * running and verify the metrics emitted
- */
-public class SegmentStatusCheckerIntegrationTest extends 
BaseClusterIntegrationTestSet {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentStatusCheckerIntegrationTest.class);
-
-  private String emptyTable = "table1_OFFLINE";
-  private String disabledOfflineTable = "table2_OFFLINE";
-  private String basicOfflineTable = "table3_OFFLINE";
-  private String errorOfflineTable = "table4_OFFLINE";
-  private String realtimeTableErrorState = "table5_REALTIME";
-  private String _currentTableName;
-  private static final int NUM_TABLES = 5;
-
-  private static final int SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS = 60;
-  private static final int SEGMENT_STATUS_CHECKER_FREQ_SECONDS = 5;
-
-  @BeforeClass
-  public void setUp() throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    startZk();
-
-    // Set initial delay of 60 seconds for the segment status checker, to 
allow time for tables setup.
-    // Run at 5 seconds freq in order to keep it running, in case first run 
happens before table setup
-    ControllerConf controllerConf = getDefaultControllerConfiguration();
-    
controllerConf.setStatusCheckerInitialDelayInSeconds(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS);
-    
controllerConf.setStatusCheckerFrequencyInSeconds(SEGMENT_STATUS_CHECKER_FREQ_SECONDS);
-
-    startController(controllerConf);
-    startBroker();
-    startServers(3);
-
-    // empty table
-    setupOfflineTable(emptyTable);
-
-    // table with disabled ideal state
-    setupOfflineTable(disabledOfflineTable);
-    _helixAdmin.enableResource(_clusterName, disabledOfflineTable, false);
-
-    // happy case table
-    setupOfflineTableAndSegments(basicOfflineTable);
-
-    // some segments offline
-    setupOfflineTableAndSegments(errorOfflineTable);
-    HelixHelper.updateIdealState(_helixManager, errorOfflineTable, new 
Function<IdealState, IdealState>() {
-      @Nullable
-      @Override
-      public IdealState apply(@Nullable IdealState input) {
-        List<String> segmentNames = 
Lists.newArrayList(input.getPartitionSet());
-        Collections.sort(segmentNames);
-
-        Map<String, String> instanceStateMap1 = 
input.getInstanceStateMap(segmentNames.get(0));
-        for (String instance : instanceStateMap1.keySet()) {
-          instanceStateMap1.put(instance, "OFFLINE");
-          break;
-        }
-        return input;
-      }
-    }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
-
-    // realtime table with segments in error state
-    setupRealtimeTable(realtimeTableErrorState);
-  }
-
-  private void setupOfflineTable(String table) throws Exception {
-    _realtimeTableConfig = null;
-    addOfflineTable(table);
-    completeTableConfiguration();
-  }
-
-  private void setupOfflineTableAndSegments(String table) throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-    setTableName(table);
-    _realtimeTableConfig = null;
-    addOfflineTable(table);
-    completeTableConfiguration();
-    List<File> avroFiles = unpackAvroData(_tempDir);
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0, 
_segmentDir, _tarDir, table, false, null, null,
-        null, executor);
-    executor.shutdown();
-    executor.awaitTermination(10, TimeUnit.MINUTES);
-    uploadSegments(_tarDir);
-
-    waitForAllDocsLoaded(600_000L);
-  }
-
-  private void setupRealtimeTable(String table) throws Exception {
-    _offlineTableConfig = null;
-    File schemaFile = getSchemaFile();
-    Schema schema = Schema.fromFile(schemaFile);
-    String schemaName = schema.getSchemaName();
-    addSchema(schemaFile, schemaName);
-
-    String timeColumnName = schema.getTimeColumnName();
-    Assert.assertNotNull(timeColumnName);
-    TimeUnit outgoingTimeUnit = schema.getOutgoingTimeUnit();
-    Assert.assertNotNull(outgoingTimeUnit);
-    String timeType = outgoingTimeUnit.toString();
-
-    addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, 
KafkaStarterUtils.DEFAULT_ZK_STR,
-        getKafkaTopic(), getRealtimeSegmentFlushSize(), null, timeColumnName, 
timeType, schemaName, null, null,
-        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), 
getBloomFilterIndexColumns(), getRawIndexColumns(),
-        getTaskConfig(), getStreamConsumerFactoryClassName());
-    completeTableConfiguration();
-  }
-
-  @Override
-  public String getTableName() {
-    return _currentTableName;
-  }
-
-  private void setTableName(String tableName) {
-    _currentTableName = tableName;
-  }
-  /**
-   * After 1 run of SegmentStatusChecker the controllerMetrics will be set for 
each table
-   * Validate that we are seeing the expected numbers
-   */
-  @Test
-  public void testSegmentStatusChecker() {
-    ControllerMetrics controllerMetrics = 
_controllerStarter.getControllerMetrics();
-
-    long millisToWait = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
-    while 
(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
-        "SegmentStatusChecker") < NUM_TABLES && millisToWait > 0) {
-      try {
-        Thread.sleep(1000);
-        millisToWait -= 1000;
-      } catch (InterruptedException e) {
-        LOGGER.info("Interrupted while waiting for SegmentStatusChecker");
-      }
-    }
-
-    
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
-        "SegmentStatusChecker"), NUM_TABLES);
-
-    // empty table - table1_OFFLINE
-    // num replicas set from ideal state
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, 
ControllerGauge.NUMBER_OF_REPLICAS), 3);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, 
ControllerGauge.PERCENT_OF_REPLICAS), 100);
-    Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
-        100);
-
-    // disabled table - table2_OFFLINE
-    // reset to defaults
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(disabledOfflineTable, 
ControllerGauge.NUMBER_OF_REPLICAS),
-        Long.MIN_VALUE);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(disabledOfflineTable, 
ControllerGauge.PERCENT_OF_REPLICAS),
-        Long.MIN_VALUE);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(disabledOfflineTable, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
-        Long.MIN_VALUE);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(disabledOfflineTable, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
-        Long.MIN_VALUE);
-
-    // happy path table - table3_OFFLINE
-    IdealState idealState = 
_helixResourceManager.getTableIdealState(basicOfflineTable);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(basicOfflineTable, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE),
-        idealState.toString().length());
-    
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, 
ControllerGauge.SEGMENT_COUNT),
-        (long) (idealState.getPartitionSet().size()));
-    
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, 
ControllerGauge.NUMBER_OF_REPLICAS),
-        3);
-    
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable, 
ControllerGauge.PERCENT_OF_REPLICAS),
-        100);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(basicOfflineTable, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(basicOfflineTable, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-
-    // offline segments - table4_OFFLINE
-    // 2 replicas available out of 3, percent 66
-    idealState = _helixResourceManager.getTableIdealState(errorOfflineTable);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(errorOfflineTable, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE),
-        idealState.toString().length());
-    
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, 
ControllerGauge.SEGMENT_COUNT),
-        (long) (idealState.getPartitionSet().size()));
-    
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, 
ControllerGauge.NUMBER_OF_REPLICAS),
-        2);
-    
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable, 
ControllerGauge.PERCENT_OF_REPLICAS),
-        66);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(errorOfflineTable, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(errorOfflineTable, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-
-    // error segments - table5_REALTIME
-    // no replicas available, all segments in error state
-    idealState = 
_helixResourceManager.getTableIdealState(realtimeTableErrorState);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE),
-        idealState.toString().length());
-    
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(realtimeTableErrorState,
 ControllerGauge.SEGMENT_COUNT),
-        (long) (idealState.getPartitionSet().size()));
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, 
ControllerGauge.NUMBER_OF_REPLICAS), 0);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, 
ControllerGauge.PERCENT_OF_REPLICAS), 0);
-    Assert.assertTrue(
-        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE) > 0);
-    Assert.assertEquals(
-        controllerMetrics.getValueOfTableGauge(realtimeTableErrorState, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 0);
-
-    // Total metrics
-    
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT),
 4);
-    
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT),
 1);
-    
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
 1);
-  }
-
-  @Override
-  protected boolean isUsingNewConfigFormat() {
-    return true;
-  }
-
-  @AfterClass
-  public void tearDown() throws Exception {
-    dropRealtimeTable(realtimeTableErrorState);
-    dropOfflineTable(emptyTable);
-    dropOfflineTable(disabledOfflineTable);
-    dropOfflineTable(basicOfflineTable);
-    dropOfflineTable(errorOfflineTable);
-
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-    FileUtils.deleteDirectory(_tempDir);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to