This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c96221f697 add RebalanceConfig class to replace use of 
BaseConfiguration for rebalance configs (#11730)
c96221f697 is described below

commit c96221f697ab9347a1e5dbf145a9d6d660c53d62
Author: Xiaobing <[email protected]>
AuthorDate: Wed Oct 4 11:51:51 2023 -0700

    add RebalanceConfig class to replace use of BaseConfiguration for rebalance 
configs (#11730)
    
    * add RebalanceConfig class to replace use of Configuration for rebalance 
configs
    
    * remove RebalanceConfigConstants
---
 .../apache/pinot/controller/ControllerConf.java    |   6 +-
 .../api/resources/PinotTableRestletResource.java   |  46 +++--
 .../api/resources/PinotTenantRestletResource.java  |  16 +-
 .../helix/core/PinotHelixResourceManager.java      |  14 +-
 .../segment/OfflineSegmentAssignment.java          |   9 +-
 .../segment/RealtimeSegmentAssignment.java         |  12 +-
 .../core/assignment/segment/SegmentAssignment.java |   4 +-
 .../helix/core/rebalance/RebalanceConfig.java      | 193 +++++++++++++++++++++
 .../helix/core/rebalance/RebalanceContext.java     | 138 ---------------
 ...tRebalancer.java => RebalanceJobConstants.java} |   9 +-
 .../helix/core/rebalance/TableRebalancer.java      |  37 ++--
 .../rebalance/ZkBasedTableRebalanceObserver.java   |   3 +-
 .../rebalance/tenant/DefaultTenantRebalancer.java  |  79 ++++-----
 ...anceContext.java => TenantRebalanceConfig.java} |   6 +-
 .../core/rebalance/tenant/TenantRebalancer.java    |   3 +-
 .../tenant/ZkBasedTenantRebalanceObserver.java     |   4 +-
 .../helix/core/relocation/SegmentRelocator.java    |  19 +-
 ...NonReplicaGroupTieredSegmentAssignmentTest.java |  20 ++-
 ...altimeNonReplicaGroupSegmentAssignmentTest.java |  18 +-
 ...NonReplicaGroupTieredSegmentAssignmentTest.java |  20 +--
 .../RealtimeReplicaGroupSegmentAssignmentTest.java |  22 ++-
 .../AllServersSegmentAssignmentStrategyTest.java   |   3 +-
 .../BalancedNumSegmentAssignmentStrategyTest.java  |  12 +-
 .../ReplicaGroupSegmentAssignmentStrategyTest.java |  48 +++--
 .../TableRebalancerClusterStatelessTest.java       |  45 +++--
 .../rebalance/tenant/TenantRebalancerTest.java     |  17 +-
 .../pinot/spi/utils/RebalanceConfigConstants.java  |  76 --------
 .../utils/builder/ControllerRequestURLBuilder.java |  16 +-
 .../apache/pinot/tools/PinotTableRebalancer.java   |  30 ++--
 .../tools/admin/command/RebalanceTableCommand.java |   6 +-
 30 files changed, 419 insertions(+), 512 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index fa6853191f..cb6ee99c4f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -30,10 +30,10 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.TimeUtils;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_CONTROLLER_METRICS_PREFIX;
@@ -962,12 +962,12 @@ public class ControllerConf extends PinotConfiguration {
 
   public long getSegmentRelocatorExternalViewCheckIntervalInMs() {
     return 
getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
-        RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS);
+        RebalanceConfig.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS);
   }
 
   public long getSegmentRelocatorExternalViewStabilizationTimeoutInMs() {
     return 
getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
-        
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS);
+        RebalanceConfig.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS);
   }
 
   public boolean isSegmentRelocatorRebalanceTablesSequentially() {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index b4dda6a56a..9cc7020a1d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -64,8 +64,6 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
@@ -94,6 +92,8 @@ import 
org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
@@ -114,7 +114,6 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.zookeeper.data.Stat;
@@ -621,20 +620,18 @@ public class PinotTableRestletResource {
 
     String tableNameWithType = constructTableNameWithType(tableName, 
tableTypeStr);
 
-    Configuration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun);
-    rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, 
reassignInstances);
-    rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, 
includeConsuming);
-    rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, bootstrap);
-    rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime);
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
 minAvailableReplicas);
-    rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, 
bestEfforts);
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
-        externalViewCheckIntervalInMs);
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
-        externalViewStabilizationTimeoutInMs);
-    rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER, 
updateTargetTier);
-    rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, 
TableRebalancer.createUniqueRebalanceJobIdentifier());
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(dryRun);
+    rebalanceConfig.setReassignInstances(reassignInstances);
+    rebalanceConfig.setIncludeConsuming(includeConsuming);
+    rebalanceConfig.setBootstrap(bootstrap);
+    rebalanceConfig.setDowntime(downtime);
+    rebalanceConfig.setMinAvailableReplicas(minAvailableReplicas);
+    rebalanceConfig.setBestEfforts(bestEfforts);
+    
rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
+    
rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
+    rebalanceConfig.setUpdateTargetTier(updateTargetTier);
+    
rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier());
 
     try {
       if (dryRun || downtime) {
@@ -642,13 +639,13 @@ public class PinotTableRestletResource {
         return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, false);
       } else {
         // Make a dry-run first to get the target assignment
-        rebalanceConfig.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+        rebalanceConfig.setDryRun(true);
         RebalanceResult dryRunResult =
             _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, false);
 
         if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
           // If dry-run succeeded, run rebalance asynchronously
-          rebalanceConfig.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+          rebalanceConfig.setDryRun(false);
           _executorService.submit(() -> {
             try {
               _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, true);
@@ -744,13 +741,12 @@ public class PinotTableRestletResource {
       throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
           Response.Status.NOT_FOUND);
     }
-    TableRebalanceProgressStats tableRebalanceProgressStats =
-        
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
-            TableRebalanceProgressStats.class);
+    TableRebalanceProgressStats tableRebalanceProgressStats = 
JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
+        TableRebalanceProgressStats.class);
     long timeSinceStartInSecs = 0L;
-    if 
(!tableRebalanceProgressStats.getStatus().equals(RebalanceResult.Status.DONE)) {
-      timeSinceStartInSecs =
-          (System.currentTimeMillis() - 
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+    if 
(!RebalanceResult.Status.DONE.toString().equals(tableRebalanceProgressStats.getStatus()))
 {
+      timeSinceStartInSecs = (System.currentTimeMillis() - 
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
     }
 
     ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new 
ServerRebalanceJobStatusResponse();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index f160bba80d..3aa5da48e9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -58,7 +58,8 @@ import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceContext;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult;
 import 
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
@@ -70,7 +71,6 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -584,9 +584,9 @@ public class PinotTenantRestletResource {
   @ApiOperation(value = "Rebalances all the tables that are part of the 
tenant")
   public TenantRebalanceResult rebalance(
       @ApiParam(value = "Name of the tenant whose table are to be rebalanced", 
required = true)
-      @PathParam("tenantName") String tenantName, @ApiParam(required = true) 
TenantRebalanceContext context) {
-    context.setTenantName(tenantName);
-    return _tenantRebalancer.rebalance(context);
+      @PathParam("tenantName") String tenantName, @ApiParam(required = true) 
TenantRebalanceConfig config) {
+    config.setTenantName(tenantName);
+    return _tenantRebalancer.rebalance(config);
   }
 
   @GET
@@ -606,9 +606,9 @@ public class PinotTenantRestletResource {
       throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
           Response.Status.NOT_FOUND);
     }
-    TenantRebalanceProgressStats tenantRebalanceProgressStats =
-        
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
-            TenantRebalanceProgressStats.class);
+    TenantRebalanceProgressStats tenantRebalanceProgressStats = 
JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
+        TenantRebalanceProgressStats.class);
     long timeSinceStartInSecs = 
tenantRebalanceProgressStats.getTimeToFinishInSeconds();
     if (tenantRebalanceProgressStats.getCompletionStatusMsg() == null) {
       timeSinceStartInSecs =
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7ecd25aab1..c9bf0c52af 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -56,7 +56,6 @@ import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.Response;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
@@ -144,6 +143,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
 import org.apache.pinot.controller.helix.core.lineage.LineageManager;
 import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import 
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
@@ -175,7 +175,6 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -3081,21 +3080,20 @@ public class PinotHelixResourceManager {
    * Entry point for table Rebalacing.
    * @param tableNameWithType
    * @param rebalanceConfig
-   * @param trackRebalanceProgress - Do we want to track rebalance progress 
stats
+   * @param trackRebalanceProgress whether to track rebalance progress stats
    * @return RebalanceResult
    * @throws TableNotFoundException
    */
-  public RebalanceResult rebalanceTable(String tableNameWithType, 
Configuration rebalanceConfig,
+  public RebalanceResult rebalanceTable(String tableNameWithType, 
RebalanceConfig rebalanceConfig,
       boolean trackRebalanceProgress)
       throws TableNotFoundException {
     TableConfig tableConfig = getTableConfig(tableNameWithType);
     if (tableConfig == null) {
       throw new TableNotFoundException("Failed to find table config for table: 
" + tableNameWithType);
     }
-    String rebalanceJobId = 
rebalanceConfig.getString(RebalanceConfigConstants.JOB_ID);
-    Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig ");
-    if (rebalanceConfig.getBoolean(RebalanceConfigConstants.UPDATE_TARGET_TIER,
-        RebalanceConfigConstants.DEFAULT_UPDATE_TARGET_TIER)) {
+    String rebalanceJobId = rebalanceConfig.getJobId();
+    Preconditions.checkState(rebalanceJobId != null, "RebalanceId not 
populated in the rebalanceConfig");
+    if (rebalanceConfig.isUpdateTargetTier()) {
       updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
     }
     ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 36f784515a..05eb7d4048 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -23,15 +23,14 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.tier.Tier;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.AllServersSegmentAssignmentStrategy;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 
 
 /**
@@ -61,7 +60,7 @@ public class OfflineSegmentAssignment extends 
BaseSegmentAssignment {
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
-      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config) {
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config) {
     InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
     Preconditions
         .checkState(offlineInstancePartitions != null, "Failed to find OFFLINE 
instance partitions for table: %s",
@@ -78,9 +77,7 @@ public class OfflineSegmentAssignment extends 
BaseSegmentAssignment {
       return segmentAssignmentStrategy
           .reassignSegments(currentAssignment, offlineInstancePartitions, 
InstancePartitionsType.OFFLINE);
     }
-    boolean bootstrap =
-        config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, 
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
-
+    boolean bootstrap = config.isBootstrap();
     // Rebalance tiers first
     Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, 
String>>> pair =
         rebalanceTiers(currentAssignment, sortedTiers, 
tierInstancePartitionsMap, bootstrap,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index db8b375a4c..434dbec4e3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -25,15 +25,14 @@ import java.util.Map;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.tier.Tier;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 
 
 /**
@@ -174,17 +173,14 @@ public class RealtimeSegmentAssignment extends 
BaseSegmentAssignment {
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
-      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config) {
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config) {
     InstancePartitions completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
     InstancePartitions consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
     Preconditions
         .checkState(consumingInstancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
             _tableNameWithType);
-    boolean includeConsuming = config
-        .getBoolean(RebalanceConfigConstants.INCLUDE_CONSUMING, 
RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING);
-    boolean bootstrap =
-        config.getBoolean(RebalanceConfigConstants.BOOTSTRAP, 
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
-
+    boolean includeConsuming = config.isIncludeConsuming();
+    boolean bootstrap = config.isBootstrap();
     // Rebalance tiers first
     Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String, 
String>>> pair =
         rebalanceTiers(currentAssignment, sortedTiers, 
tierInstancePartitionsMap, bootstrap,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
index c2f8f5d61b..12ec0b5545 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
@@ -21,11 +21,11 @@ package 
org.apache.pinot.controller.helix.core.assignment.segment;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.commons.configuration.Configuration;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 
@@ -65,5 +65,5 @@ public interface SegmentAssignment {
    */
   Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, 
String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
@Nullable List<Tier> sortedTiers,
-      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
Configuration config);
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, 
RebalanceConfig config);
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
new file mode 100644
index 0000000000..b2a307f2e5
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+
+@ApiModel
+public class RebalanceConfig {
+  public static final int DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = 1;
+  public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 1000L; 
// 1 second
+  public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = 
3600000L; // 1 hour
+
+  // Whether to rebalance table in dry-run mode
+  @JsonProperty("dryRun")
+  @ApiModelProperty(example = "false")
+  private boolean _dryRun = false;
+
+  // Whether to reassign instances before reassigning segments
+  @JsonProperty("reassignInstances")
+  @ApiModelProperty(example = "false")
+  private boolean _reassignInstances = false;
+
+  // Whether to reassign CONSUMING segments
+  @JsonProperty("includeConsuming")
+  @ApiModelProperty(example = "false")
+  private boolean _includeConsuming = false;
+
+  // Whether to rebalance table in bootstrap mode (regardless of minimum 
segment movement, reassign all segments in a
+  // round-robin fashion as if adding new segments to an empty table)
+  @JsonProperty("bootstrap")
+  @ApiModelProperty(example = "false")
+  private boolean _bootstrap = false;
+
+  // Whether to allow downtime for the rebalance
+  @JsonProperty("downtime")
+  @ApiModelProperty(example = "false")
+  private boolean _downtime = false;
+
+  // For no-downtime rebalance, minimum number of replicas to keep alive 
during rebalance, or maximum number of replicas
+  // allowed to be unavailable if value is negative
+  @JsonProperty("minAvailableReplicas")
+  @ApiModelProperty(example = "1")
+  private int _minAvailableReplicas = 
DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME;
+
+  // Whether to use best-efforts to rebalance (not fail the rebalance when the 
no-downtime contract cannot be achieved)
+  // When using best-efforts to rebalance, the following scenarios won't fail 
the rebalance (will log warnings instead):
+  // - Segment falls into ERROR state in ExternalView -> count ERROR state as 
good state
+  // - ExternalView has not converged within the maximum wait time -> continue 
to the next stage
+  @JsonProperty("bestEfforts")
+  @ApiModelProperty(example = "false")
+  private boolean _bestEfforts = false;
+
+  // The check on external view can be very costly when the table has very 
large ideal and external states, i.e. when
+  // having a huge number of segments. These two configs help reduce the cpu 
load on controllers, e.g. by doing the
+  // check less frequently and bail out sooner to rebalance at best effort if 
configured so.
+  @JsonProperty("externalViewCheckIntervalInMs")
+  @ApiModelProperty(example = "1000")
+  private long _externalViewCheckIntervalInMs = 
DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS;
+
+  @JsonProperty("externalViewStabilizationTimeoutInMs")
+  @ApiModelProperty(example = "3600000")
+  private long _externalViewStabilizationTimeoutInMs = 
DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS;
+
+  @JsonProperty("updateTargetTier")
+  @ApiModelProperty(example = "false")
+  private boolean _updateTargetTier = false;
+
+  @JsonProperty("jobId")
+  private String _jobId = null;
+
+  public boolean isDryRun() {
+    return _dryRun;
+  }
+
+  public void setDryRun(boolean dryRun) {
+    _dryRun = dryRun;
+  }
+
+  public boolean isReassignInstances() {
+    return _reassignInstances;
+  }
+
+  public void setReassignInstances(boolean reassignInstances) {
+    _reassignInstances = reassignInstances;
+  }
+
+  public boolean isIncludeConsuming() {
+    return _includeConsuming;
+  }
+
+  public void setIncludeConsuming(boolean includeConsuming) {
+    _includeConsuming = includeConsuming;
+  }
+
+  public boolean isBootstrap() {
+    return _bootstrap;
+  }
+
+  public void setBootstrap(boolean bootstrap) {
+    _bootstrap = bootstrap;
+  }
+
+  public boolean isDowntime() {
+    return _downtime;
+  }
+
+  public void setDowntime(boolean downtime) {
+    _downtime = downtime;
+  }
+
+  public int getMinAvailableReplicas() {
+    return _minAvailableReplicas;
+  }
+
+  public void setMinAvailableReplicas(int minAvailableReplicas) {
+    _minAvailableReplicas = minAvailableReplicas;
+  }
+
+  public boolean isBestEfforts() {
+    return _bestEfforts;
+  }
+
+  public void setBestEfforts(boolean bestEfforts) {
+    _bestEfforts = bestEfforts;
+  }
+
+  public long getExternalViewCheckIntervalInMs() {
+    return _externalViewCheckIntervalInMs;
+  }
+
+  public void setExternalViewCheckIntervalInMs(long 
externalViewCheckIntervalInMs) {
+    _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs;
+  }
+
+  public long getExternalViewStabilizationTimeoutInMs() {
+    return _externalViewStabilizationTimeoutInMs;
+  }
+
+  public void setExternalViewStabilizationTimeoutInMs(long 
externalViewStabilizationTimeoutInMs) {
+    _externalViewStabilizationTimeoutInMs = 
externalViewStabilizationTimeoutInMs;
+  }
+
+  public boolean isUpdateTargetTier() {
+    return _updateTargetTier;
+  }
+
+  public void setUpdateTargetTier(boolean updateTargetTier) {
+    _updateTargetTier = updateTargetTier;
+  }
+
+  public String getJobId() {
+    return _jobId;
+  }
+
+  public void setJobId(String jobId) {
+    _jobId = jobId;
+  }
+
+  public static RebalanceConfig copy(RebalanceConfig cfg) {
+    RebalanceConfig rc = new RebalanceConfig();
+    rc._dryRun = cfg._dryRun;
+    rc._reassignInstances = cfg._reassignInstances;
+    rc._includeConsuming = cfg._includeConsuming;
+    rc._bootstrap = cfg._bootstrap;
+    rc._downtime = cfg._downtime;
+    rc._minAvailableReplicas = cfg._minAvailableReplicas;
+    rc._bestEfforts = cfg._bestEfforts;
+    rc._externalViewCheckIntervalInMs = cfg._externalViewCheckIntervalInMs;
+    rc._externalViewStabilizationTimeoutInMs = 
cfg._externalViewStabilizationTimeoutInMs;
+    rc._updateTargetTier = cfg._updateTargetTier;
+    rc._jobId = cfg._jobId;
+    return rc;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
deleted file mode 100644
index cd6e06c399..0000000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.rebalance;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-
-@ApiModel
-public class RebalanceContext {
-  // TODO : simplify the rebalance configs wherever possible
-  @JsonProperty("dryRun")
-  @ApiModelProperty(example = "false")
-  private Boolean _dryRun = false;
-  @JsonProperty("reassignInstances")
-  @ApiModelProperty(example = "false")
-  private Boolean _reassignInstances = false;
-  @JsonProperty("includeConsuming")
-  @ApiModelProperty(example = "false")
-  private Boolean _includeConsuming = false;
-  @JsonProperty("bootstrap")
-  @ApiModelProperty(example = "false")
-  private Boolean _bootstrap = false;
-  @JsonProperty("downtime")
-  @ApiModelProperty(example = "false")
-  private Boolean _downtime = false;
-  @JsonProperty("minAvailableReplicas")
-  @ApiModelProperty(example = "1")
-  private Integer _minAvailableReplicas = 1;
-  @JsonProperty("bestEfforts")
-  @ApiModelProperty(example = "false")
-  private Boolean _bestEfforts = false;
-  @JsonProperty("externalViewCheckIntervalInMs")
-  @ApiModelProperty(example = "1000")
-  private Long _externalViewCheckIntervalInMs = 1000L;
-  @JsonProperty("externalViewStabilizationTimeoutInMs")
-  @ApiModelProperty(example = "3600000")
-  private Long _externalViewStabilizationTimeoutInMs = 3600000L;
-  @JsonProperty("updateTargetTier")
-  @ApiModelProperty(example = "false")
-  private Boolean _updateTargetTier = false;
-
-  public Boolean isDryRun() {
-    return _dryRun;
-  }
-
-  public void setDryRun(Boolean dryRun) {
-    _dryRun = dryRun;
-  }
-
-  public Boolean isReassignInstances() {
-    return _reassignInstances;
-  }
-
-  public void setReassignInstances(Boolean reassignInstances) {
-    _reassignInstances = reassignInstances;
-  }
-
-  public Boolean isIncludeConsuming() {
-    return _includeConsuming;
-  }
-
-  public void setIncludeConsuming(Boolean includeConsuming) {
-    _includeConsuming = includeConsuming;
-  }
-
-  public Boolean isBootstrap() {
-    return _bootstrap;
-  }
-
-  public void setBootstrap(Boolean bootstrap) {
-    _bootstrap = bootstrap;
-  }
-
-  public Boolean isDowntime() {
-    return _downtime;
-  }
-
-  public void setDowntime(Boolean downtime) {
-    _downtime = downtime;
-  }
-
-  public Integer getMinAvailableReplicas() {
-    return _minAvailableReplicas;
-  }
-
-  public void setMinAvailableReplicas(Integer minAvailableReplicas) {
-    _minAvailableReplicas = minAvailableReplicas;
-  }
-
-  public Boolean isBestEfforts() {
-    return _bestEfforts;
-  }
-
-  public void setBestEfforts(Boolean bestEfforts) {
-    _bestEfforts = bestEfforts;
-  }
-
-  public Long getExternalViewCheckIntervalInMs() {
-    return _externalViewCheckIntervalInMs;
-  }
-
-  public void setExternalViewCheckIntervalInMs(Long 
externalViewCheckIntervalInMs) {
-    _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs;
-  }
-
-  public Long getExternalViewStabilizationTimeoutInMs() {
-    return _externalViewStabilizationTimeoutInMs;
-  }
-
-  public void setExternalViewStabilizationTimeoutInMs(Long 
externalViewStabilizationTimeoutInMs) {
-    _externalViewStabilizationTimeoutInMs = 
externalViewStabilizationTimeoutInMs;
-  }
-
-  public Boolean isUpdateTargetTier() {
-    return _updateTargetTier;
-  }
-
-  public void setUpdateTargetTier(Boolean updateTargetTier) {
-    _updateTargetTier = updateTargetTier;
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
similarity index 74%
copy from 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
copy to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
index 53df7824d5..8070d44972 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.controller.helix.core.rebalance.tenant;
+package org.apache.pinot.controller.helix.core.rebalance;
 
+public class RebalanceJobConstants {
+  private RebalanceJobConstants() {
+  }
 
-public interface TenantRebalancer {
-  TenantRebalanceResult rebalance(TenantRebalanceContext context);
+  // Progress status of the rebalance operartion
+  public static final String JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS = 
"REBALANCE_PROGRESS_STATS";
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 1a3b7e12f6..74e0f2f5a6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.ToIntFunction;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.helix.AccessOption;
@@ -64,7 +63,6 @@ import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,40 +136,27 @@ public class TableRebalancer {
     return UUID.randomUUID().toString();
   }
 
-  public RebalanceResult rebalance(TableConfig tableConfig, Configuration 
rebalanceConfig) {
+  public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig 
rebalanceConfig) {
     long startTimeMs = System.currentTimeMillis();
     String tableNameWithType = tableConfig.getTableName();
-    String rebalanceJobId = 
rebalanceConfig.getString(RebalanceConfigConstants.JOB_ID);
+    String rebalanceJobId = rebalanceConfig.getJobId();
     if (rebalanceJobId == null) {
       // If not passed along, create one.
       // TODO - Add rebalanceJobId to all log messages for easy tracking.
       rebalanceJobId = createUniqueRebalanceJobIdentifier();
     }
-
-    boolean dryRun =
-        rebalanceConfig.getBoolean(RebalanceConfigConstants.DRY_RUN, 
RebalanceConfigConstants.DEFAULT_DRY_RUN);
-    boolean reassignInstances = 
rebalanceConfig.getBoolean(RebalanceConfigConstants.REASSIGN_INSTANCES,
-        RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES);
-    boolean includeConsuming = 
rebalanceConfig.getBoolean(RebalanceConfigConstants.INCLUDE_CONSUMING,
-        RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING);
-    boolean bootstrap =
-        rebalanceConfig.getBoolean(RebalanceConfigConstants.BOOTSTRAP, 
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
-    boolean downtime =
-        rebalanceConfig.getBoolean(RebalanceConfigConstants.DOWNTIME, 
RebalanceConfigConstants.DEFAULT_DOWNTIME);
-    int minReplicasToKeepUpForNoDowntime =
-        
rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
-            
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
+    boolean dryRun = rebalanceConfig.isDryRun();
+    boolean reassignInstances = rebalanceConfig.isReassignInstances();
+    boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
+    boolean bootstrap = rebalanceConfig.isBootstrap();
+    boolean downtime = rebalanceConfig.isDowntime();
+    int minReplicasToKeepUpForNoDowntime = 
rebalanceConfig.getMinAvailableReplicas();
+    boolean bestEfforts = rebalanceConfig.isBestEfforts();
+    long externalViewCheckIntervalInMs = 
rebalanceConfig.getExternalViewCheckIntervalInMs();
+    long externalViewStabilizationTimeoutInMs = 
rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
     boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
-    boolean bestEfforts = 
rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
-        RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
-    long externalViewCheckIntervalInMs =
-        
rebalanceConfig.getLong(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
-            
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS);
-    long externalViewStabilizationTimeoutInMs =
-        
rebalanceConfig.getLong(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
-            
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS);
     LOGGER.info(
         "Start rebalancing table: {} with dryRun: {}, reassignInstances: {}, 
includeConsuming: {}, bootstrap: {}, "
             + "downtime: {}, minReplicasToKeepUpForNoDowntime: {}, 
enableStrictReplicaGroup: {}, bestEfforts: {}, "
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 5db4bebf58..1978df35f3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -27,7 +27,6 @@ import 
org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,7 +133,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name());
     try {
-      jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS,
+      
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
           JsonUtils.objectToString(_tableRebalanceProgressStats));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _rebalanceJobId, e);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index d0278c15e3..04d18230ae 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -29,13 +29,11 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,21 +48,22 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
   }
 
   @Override
-  public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+  public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
     Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
-    Set<String> tables = getTenantTables(context.getTenantName());
+    Set<String> tables = getTenantTables(config.getTenantName());
     tables.forEach(table -> {
       try {
-        Configuration config = extractRebalanceConfig(context);
-        config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
-        rebalanceResult.put(table, 
_pinotHelixResourceManager.rebalanceTable(table, config, false));
+        RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+        rebalanceConfig.setJobId(createUniqueRebalanceJobIdentifier());
+        rebalanceConfig.setDryRun(true);
+        rebalanceResult.put(table, 
_pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, false));
       } catch (TableNotFoundException exception) {
         rebalanceResult.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
             null, null, null));
       }
     });
-    if (context.isDryRun() || context.isDowntime()) {
-      return new TenantRebalanceResult(null, rebalanceResult, 
context.isVerboseResult());
+    if (config.isDryRun() || config.isDowntime()) {
+      return new TenantRebalanceResult(null, rebalanceResult, 
config.isVerboseResult());
     } else {
       for (String table : rebalanceResult.keySet()) {
         RebalanceResult result = rebalanceResult.get(table);
@@ -77,25 +76,25 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
     }
 
     String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
-    TenantRebalanceObserver observer = new 
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+    TenantRebalanceObserver observer = new 
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
         tables, _pinotHelixResourceManager);
     observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, 
null);
     final Deque<String> sequentialQueue = new LinkedList<>();
     final Deque<String> parallelQueue = new ConcurrentLinkedDeque<>();
     // ensure atleast 1 thread is created to run the sequential table 
rebalance operations
-    int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
-    Set<String> dimTables = getDimensionalTables(context.getTenantName());
+    int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+    Set<String> dimTables = getDimensionalTables(config.getTenantName());
     AtomicInteger activeThreads = new AtomicInteger(parallelism);
     try {
       if (parallelism > 1) {
         Set<String> parallelTables;
-        if (!context.getParallelWhitelist().isEmpty()) {
-          parallelTables = new HashSet<>(context.getParallelWhitelist());
+        if (!config.getParallelWhitelist().isEmpty()) {
+          parallelTables = new HashSet<>(config.getParallelWhitelist());
         } else {
           parallelTables = new HashSet<>(tables);
         }
-        if (!context.getParallelBlacklist().isEmpty()) {
-          parallelTables = Sets.difference(parallelTables, 
context.getParallelBlacklist());
+        if (!config.getParallelBlacklist().isEmpty()) {
+          parallelTables = Sets.difference(parallelTables, 
config.getParallelBlacklist());
         }
         parallelTables.forEach(table -> {
           if (dimTables.contains(table)) {
@@ -131,33 +130,33 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
             if (table == null) {
               break;
             }
-            Configuration config = extractRebalanceConfig(context);
-            config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
-            config.setProperty(RebalanceConfigConstants.JOB_ID, 
rebalanceResult.get(table).getJobId());
-            rebalanceTable(table, config, observer);
+            RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+            rebalanceConfig.setDryRun(false);
+            rebalanceConfig.setJobId(rebalanceResult.get(table).getJobId());
+            rebalanceTable(table, rebalanceConfig, observer);
           }
           // Last parallel thread to finish the table rebalance job will pick 
up the
           // sequential table rebalance execution
           if (activeThreads.decrementAndGet() == 0) {
-            Configuration config = extractRebalanceConfig(context);
-            config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+            RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+            rebalanceConfig.setDryRun(false);
             while (true) {
               String table = sequentialQueue.pollFirst();
               if (table == null) {
                 break;
               }
-              config.setProperty(RebalanceConfigConstants.JOB_ID, 
rebalanceResult.get(table).getJobId());
-              rebalanceTable(table, config, observer);
+              rebalanceConfig.setJobId(rebalanceResult.get(table).getJobId());
+              rebalanceTable(table, rebalanceConfig, observer);
             }
-            observer.onSuccess(String.format("Successfully rebalanced tenant 
%s.", context.getTenantName()));
+            observer.onSuccess(String.format("Successfully rebalanced tenant 
%s.", config.getTenantName()));
           }
         });
       }
     } catch (Exception exception) {
-      observer.onError(String.format("Failed to rebalance the tenant %s. 
Cause: %s", context.getTenantName(),
+      observer.onError(String.format("Failed to rebalance the tenant %s. 
Cause: %s", config.getTenantName(),
           exception.getMessage()));
     }
-    return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult, 
context.isVerboseResult());
+    return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult, 
config.isVerboseResult());
   }
 
   private Set<String> getDimensionalTables(String tenantName) {
@@ -175,25 +174,6 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
     return dimTables;
   }
 
-  private Configuration extractRebalanceConfig(TenantRebalanceContext context) 
{
-    Configuration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, 
context.isDryRun());
-    rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, 
context.isReassignInstances());
-    rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, 
context.isIncludeConsuming());
-    rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, 
context.isBootstrap());
-    rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, 
context.isDowntime());
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
-        context.getMinAvailableReplicas());
-    rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, 
context.isBestEfforts());
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
-        context.getExternalViewCheckIntervalInMs());
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
-        context.getExternalViewStabilizationTimeoutInMs());
-    rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER, 
context.isUpdateTargetTier());
-    rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, 
createUniqueRebalanceJobIdentifier());
-    return rebalanceConfig;
-  }
-
   private String createUniqueRebalanceJobIdentifier() {
     return UUID.randomUUID().toString();
   }
@@ -214,11 +194,10 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
     return tables;
   }
 
-  private void rebalanceTable(String tableName, Configuration config,
+  private void rebalanceTable(String tableName, RebalanceConfig config,
       TenantRebalanceObserver observer) {
     try {
-      
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, 
tableName,
-          config.getString(RebalanceConfigConstants.JOB_ID));
+      
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, 
tableName, config.getJobId());
       RebalanceResult result = 
_pinotHelixResourceManager.rebalanceTable(tableName, config, true);
       if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
         
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, 
tableName, null);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java
similarity index 95%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
rename to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java
index 5e76dcc014..3ca6bd777e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java
@@ -23,15 +23,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import io.swagger.annotations.ApiModelProperty;
 import java.util.HashSet;
 import java.util.Set;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceContext;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 
 
-public class TenantRebalanceContext extends RebalanceContext {
+public class TenantRebalanceConfig extends RebalanceConfig {
   @JsonIgnore
   private String _tenantName;
   @JsonProperty("degreeOfParallelism")
   @ApiModelProperty(example = "1")
-  private Integer _degreeOfParallelism = 1;
+  private int _degreeOfParallelism = 1;
   @JsonProperty("parallelWhitelist")
   private Set<String> _parallelWhitelist = new HashSet<>();
   @JsonProperty("parallelBlacklist")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
index 53df7824d5..4b28b30569 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.controller.helix.core.rebalance.tenant;
 
-
 public interface TenantRebalancer {
-  TenantRebalanceResult rebalance(TenantRebalanceContext context);
+  TenantRebalanceResult rebalance(TenantRebalanceConfig config);
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 7521caa3f3..6c26985539 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -28,9 +28,9 @@ import java.util.Set;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,7 +103,7 @@ public class ZkBasedTenantRebalanceObserver implements 
TenantRebalanceObserver {
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TENANT_REBALANCE.name());
     try {
-      jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS,
+      
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
           JsonUtils.objectToString(_progressStats));
     } catch (JsonProcessingException e) {
       LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}", _jobId, e);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index ad9cd1f0f5..0a77f9ff1f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -30,8 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.Criteria;
 import org.apache.helix.InstanceType;
@@ -44,12 +42,12 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -166,15 +164,12 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
     }
 
     // Allow at most one replica unavailable during relocation
-    Configuration rebalanceConfig = new BaseConfiguration();
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
 -1);
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
-        _externalViewCheckIntervalInMs);
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
-        _externalViewStabilizationTimeoutInMs);
-    rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER,
-        TierConfigUtils.shouldRelocateToTiers(tableConfig));
-    rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID, 
TableRebalancer.createUniqueRebalanceJobIdentifier());
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setMinAvailableReplicas(-1);
+    
rebalanceConfig.setExternalViewCheckIntervalInMs(_externalViewCheckIntervalInMs);
+    
rebalanceConfig.setExternalViewStabilizationTimeoutInMs(_externalViewStabilizationTimeoutInMs);
+    
rebalanceConfig.setUpdateTargetTier(TierConfigUtils.shouldRelocateToTiers(tableConfig));
+    
rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier());
 
     try {
       // Relocating segments to new tiers needs two sequential actions: table 
rebalance and local tier migration.
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
index af75b05c23..c9a42536d3 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -25,20 +25,19 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
+import java.util.TreeSet;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.tier.TierSegmentSelector;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -165,7 +164,7 @@ public class 
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
     // On rebalancing, segments move to tiers
     Map<String, Map<String, String>> newAssignment =
         _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, _sortedTiers,
-            _tierInstancePartitionsMap, new BaseConfiguration());
+            _tierInstancePartitionsMap, new RebalanceConfig());
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
 
     // segments 0-49 remain unchanged
@@ -205,7 +204,7 @@ public class 
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
 
     // rebalance without tierInstancePartitions resets the assignment
     Map<String, Map<String, String>> resetAssignment =
-        _segmentAssignment.rebalanceTable(newAssignment, 
_instancePartitionsMap, null, null, new BaseConfiguration());
+        _segmentAssignment.rebalanceTable(newAssignment, 
_instancePartitionsMap, null, null, new RebalanceConfig());
     for (String segment : SEGMENTS) {
       
Assert.assertTrue(INSTANCES.containsAll(resetAssignment.get(segment).keySet()));
     }
@@ -214,7 +213,10 @@ public class 
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
   @Test
   public void testBootstrapTable() {
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
-    for (String segmentName : SEGMENTS) {
+    // The list of segments are segment_0, segment_1, ... segment_10, ...; but 
TreeMap sorts segments as segment_0,
+    // segment_1, segment_10, ... segment_2, ... So to make bootstrap generate 
same assignment as assigning each
+    // segment separately, we need to process the segments in the same order.
+    for (String segmentName : new TreeSet<>(SEGMENTS)) {
       List<String> instancesAssigned =
           _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       currentAssignment.put(segmentName,
@@ -222,11 +224,11 @@ public class 
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
     }
 
     // Bootstrap table should reassign all segments
-    Configuration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setBootstrap(true);
     Map<String, Map<String, String>> newAssignment =
         _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, _sortedTiers,
-            _tierInstancePartitionsMap, new BaseConfiguration());
+            _tierInstancePartitionsMap, rebalanceConfig);
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
 
     // segments 0-49 remain unchanged
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 4ab378e3a7..9d921d3af8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -24,18 +24,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -227,13 +226,12 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest 
{
     Map<InstancePartitionsType, InstancePartitions> 
noRelocationInstancePartitionsMap =
         ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
noRelocationInstancePartitionsMap, null, null,
-        new BaseConfiguration()), currentAssignment);
+        new RebalanceConfig()), currentAssignment);
 
     // Rebalance with COMPLETED instance partitions should relocate all 
COMPLETED (ONLINE) segments to the COMPLETED
     // instances
     Map<String, Map<String, String>> newAssignment =
-        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null,
-            new BaseConfiguration());
+        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, new RebalanceConfig());
     assertEquals(newAssignment.size(), NUM_SEGMENTS + numUploadedSegments + 1);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
@@ -270,8 +268,8 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
     assertEquals(totalNumSegmentsAssigned, expectedTotalNumSegmentsAssigned);
 
     // Rebalance with COMPLETED instance partitions including CONSUMING 
segments should give the same assignment
-    BaseConfiguration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, 
true);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setIncludeConsuming(true);
     assertEquals(
         _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, rebalanceConfig),
         newAssignment);
@@ -279,11 +277,11 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest 
{
     // Rebalance without COMPLETED instance partitions again should change the 
segment assignment back
     currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
     assertEquals(_segmentAssignment.rebalanceTable(newAssignment, 
noRelocationInstancePartitionsMap, null, null,
-        new BaseConfiguration()), currentAssignment);
+        new RebalanceConfig()), currentAssignment);
 
     // Bootstrap table without COMPLETED instance partitions should be the 
same as regular rebalance
-    rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setBootstrap(true);
     assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
noRelocationInstancePartitionsMap, null, null,
         rebalanceConfig), currentAssignment);
     assertEquals(_segmentAssignment.rebalanceTable(newAssignment, 
noRelocationInstancePartitionsMap, null, null,
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index f540350e64..e03a8d4a46 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
@@ -33,13 +32,13 @@ import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.tier.TierSegmentSelector;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -184,8 +183,7 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
 
     // Rebalance without tier instancePartitions moves all instances to 
COMPLETED
     Map<String, Map<String, String>> newAssignment =
-        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null,
-            new BaseConfiguration());
+        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, new RebalanceConfig());
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { // ONLINE segments
@@ -208,7 +206,7 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
     int expectedOnTierB = 20;
     int expectedOnCompleted = NUM_SEGMENTS - NUM_PARTITIONS - expectedOnTierA 
- expectedOnTierB;
     newAssignment = _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, _sortedTiers,
-        _tierInstancePartitionsMap, new BaseConfiguration());
+        _tierInstancePartitionsMap, new RebalanceConfig());
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
@@ -259,8 +257,8 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
     }
 
     // Rebalance with including CONSUMING should give the same assignment
-    BaseConfiguration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, 
true);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setIncludeConsuming(true);
     assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, _sortedTiers,
         _tierInstancePartitionsMap, rebalanceConfig), newAssignment);
 
@@ -270,13 +268,13 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
     noRelocationInstancePartitionsMap.put(InstancePartitionsType.CONSUMING,
         _instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     assertEquals(_segmentAssignment.rebalanceTable(newAssignment, 
noRelocationInstancePartitionsMap, null, null,
-        new BaseConfiguration()), currentAssignment);
+        new RebalanceConfig()), currentAssignment);
 
     // Rebalance without COMPLETED instance partitions and with 
tierInstancePartitions should move ONLINE segments to
     // Tiers and CONSUMING segments to CONSUMING tenant.
     newAssignment =
         _segmentAssignment.rebalanceTable(currentAssignment, 
noRelocationInstancePartitionsMap, _sortedTiers,
-            _tierInstancePartitionsMap, new BaseConfiguration());
+            _tierInstancePartitionsMap, new RebalanceConfig());
 
     numSegmentsAssignedPerInstance =
         
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, 
INSTANCES_TIER_A);
@@ -299,8 +297,8 @@ public class 
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
     assertEquals(numSegmentsAssignedPerInstance.length, 
NUM_CONSUMING_INSTANCES);
 
     // Bootstrap
-    rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setBootstrap(true);
     newAssignment = _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, _sortedTiers,
         _tierInstancePartitionsMap, rebalanceConfig);
     int index = 0;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index a6da0552cd..2c590ef25e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -25,12 +25,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -38,7 +38,6 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -214,13 +213,12 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     Map<InstancePartitionsType, InstancePartitions> 
noRelocationInstancePartitionsMap =
         ImmutableMap.of(InstancePartitionsType.CONSUMING, 
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
     assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
noRelocationInstancePartitionsMap, null, null,
-        new BaseConfiguration()), currentAssignment);
+        new RebalanceConfig()), currentAssignment);
 
     // Rebalance with COMPLETED instance partitions should relocate all 
COMPLETED (ONLINE) segments to the COMPLETED
     // instances
     Map<String, Map<String, String>> newAssignment =
-        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null,
-            new BaseConfiguration());
+        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, new RebalanceConfig());
     assertEquals(newAssignment.size(), NUM_SEGMENTS + 
uploadedSegmentNames.size() + 1);
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
@@ -255,19 +253,19 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     }
 
     // Rebalance with COMPLETED instance partitions including CONSUMING 
segments should give the same assignment
-    BaseConfiguration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, 
true);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setIncludeConsuming(true);
     assertEquals(
         _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, rebalanceConfig),
         newAssignment);
 
     // Rebalance without COMPLETED instance partitions again should change the 
segment assignment back
     assertEquals(_segmentAssignment.rebalanceTable(newAssignment, 
noRelocationInstancePartitionsMap, null, null,
-        new BaseConfiguration()), currentAssignment);
+        new RebalanceConfig()), currentAssignment);
 
     // Bootstrap table without COMPLETED instance partitions should be the 
same as regular rebalance
-    rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setBootstrap(true);
     assertEquals(_segmentAssignment.rebalanceTable(currentAssignment, 
noRelocationInstancePartitionsMap, null, null,
         rebalanceConfig), currentAssignment);
     assertEquals(_segmentAssignment.rebalanceTable(newAssignment, 
noRelocationInstancePartitionsMap, null, null,
@@ -417,8 +415,8 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
         ImmutableMap.of(InstancePartitionsType.CONSUMING, 
consumingInstancePartitions, InstancePartitionsType.COMPLETED,
             completedInstancePartitions);
-    BaseConfiguration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, 
true);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setIncludeConsuming(true);
     Map<String, Map<String, String>> newAssignment =
         _segmentAssignment.rebalanceTable(uploadedCurrentAssignment, 
instancePartitionsMap, null, null,
             rebalanceConfig);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java
index 7a75f8350d..f8df6b53eb 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.segment.OfflineSegmentA
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -124,7 +125,7 @@ public class AllServersSegmentAssignmentStrategyTest {
     when(dataAccessor.getChildValues(builder.instanceConfigs(), 
true)).thenReturn(instanceConfigList);
 
     Map<String, Map<String, String>> newAssignment =
-        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, null);
+        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, new RebalanceConfig());
     assertEquals(newAssignment.get(SEGMENT_NAME).size(), NUM_INSTANCES - 1);
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java
index 277043230f..c62f03fef2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java
@@ -24,19 +24,17 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.OfflineSegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -129,8 +127,8 @@ public class BalancedNumSegmentAssignmentStrategyTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(_segmentAssignment
-            .rebalanceTable(currentAssignment, _instancePartitionsMap, null, 
null, new BaseConfiguration()),
+    assertEquals(
+        _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, new RebalanceConfig()),
         currentAssignment);
   }
 
@@ -145,8 +143,8 @@ public class BalancedNumSegmentAssignmentStrategyTest {
     }
 
     // Bootstrap table should reassign all segments based on their 
alphabetical order
-    Configuration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setBootstrap(true);
     Map<String, Map<String, String>> newAssignment =
         _segmentAssignment.rebalanceTable(currentAssignment, 
_instancePartitionsMap, null, null, rebalanceConfig);
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java
index e944a3ab2f..ea4c88774b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java
@@ -25,8 +25,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -39,6 +37,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -46,7 +45,6 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.BeforeClass;
@@ -260,10 +258,9 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(_segmentAssignmentWithoutPartition
-            .rebalanceTable(currentAssignment, 
_instancePartitionsMapWithoutPartition, null, null,
-                new BaseConfiguration()),
-        currentAssignment);
+    assertEquals(
+        _segmentAssignmentWithoutPartition.rebalanceTable(currentAssignment, 
_instancePartitionsMapWithoutPartition,
+            null, null, new RebalanceConfig()), currentAssignment);
   }
 
   @Test
@@ -288,10 +285,9 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
     Arrays.fill(expectedNumSegmentsAssignedPerInstance, 
numSegmentsPerInstance);
     assertEquals(numSegmentsAssignedPerInstance, 
expectedNumSegmentsAssignedPerInstance);
     // Current assignment should already be balanced
-    assertEquals(_segmentAssignmentWithPartition
-            .rebalanceTable(currentAssignment, 
_instancePartitionsMapWithPartition, null, null,
-                new BaseConfiguration()),
-        currentAssignment);
+    assertEquals(
+        _segmentAssignmentWithPartition.rebalanceTable(currentAssignment, 
_instancePartitionsMapWithPartition, null,
+            null, new RebalanceConfig()), currentAssignment);
   }
 
   @Test
@@ -305,10 +301,11 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
     }
 
     // Bootstrap table should reassign all segments based on their 
alphabetical order
-    Configuration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
-    Map<String, Map<String, String>> newAssignment = 
_segmentAssignmentWithoutPartition
-        .rebalanceTable(currentAssignment, 
_instancePartitionsMapWithoutPartition, null, null, rebalanceConfig);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setBootstrap(true);
+    Map<String, Map<String, String>> newAssignment =
+        _segmentAssignmentWithoutPartition.rebalanceTable(currentAssignment, 
_instancePartitionsMapWithoutPartition,
+            null, null, rebalanceConfig);
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
     List<String> sortedSegments = new ArrayList<>(SEGMENTS);
     sortedSegments.sort(null);
@@ -328,10 +325,11 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
     }
 
     // Bootstrap table should reassign all segments based on their 
alphabetical order within the partition
-    Configuration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
-    Map<String, Map<String, String>> newAssignment = 
_segmentAssignmentWithPartition
-        .rebalanceTable(currentAssignment, 
_instancePartitionsMapWithPartition, null, null, rebalanceConfig);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setBootstrap(true);
+    Map<String, Map<String, String>> newAssignment =
+        _segmentAssignmentWithPartition.rebalanceTable(currentAssignment, 
_instancePartitionsMapWithPartition, null,
+            null, rebalanceConfig);
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
     int numSegmentsPerPartition = NUM_SEGMENTS / NUM_PARTITIONS;
     String[][] partitionIdToSegmentsMap = new 
String[NUM_PARTITIONS][numSegmentsPerPartition];
@@ -363,9 +361,9 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
     SEGMENTS.forEach(segName -> unbalancedAssignment.put(segName, ImmutableMap
         .of(instance0, SegmentStateModel.ONLINE, instance1, 
SegmentStateModel.ONLINE, instance2,
             SegmentStateModel.ONLINE)));
-    Map<String, Map<String, String>> balancedAssignment = 
_segmentAssignmentWithPartition
-        .rebalanceTable(unbalancedAssignment, 
_instancePartitionsMapWithoutPartition, null, null,
-            new BaseConfiguration());
+    Map<String, Map<String, String>> balancedAssignment =
+        _segmentAssignmentWithPartition.rebalanceTable(unbalancedAssignment, 
_instancePartitionsMapWithoutPartition,
+            null, null, new RebalanceConfig());
     int[] actualNumSegmentsAssignedPerInstance =
         
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(balancedAssignment, 
INSTANCES);
     int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
@@ -451,13 +449,13 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
 
     // Current assignment should already be balanced
     assertEquals(
-        segmentAssignment.rebalanceTable(currentAssignment, 
instancePartitionsMap, null, null, new BaseConfiguration()),
+        segmentAssignment.rebalanceTable(currentAssignment, 
instancePartitionsMap, null, null, new RebalanceConfig()),
         currentAssignment);
 
     // Test bootstrap
     // Bootstrap table should reassign all segments based on their 
alphabetical order within the partition
-    Configuration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setBootstrap(true);
     Map<String, Map<String, String>> newAssignment =
         segmentAssignment.rebalanceTable(currentAssignment, 
instancePartitionsMap, null, null, rebalanceConfig);
     assertEquals(newAssignment.size(), NUM_SEGMENTS);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 6bf4710578..efe47d6931 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,8 +24,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.tier.TierFactory;
@@ -42,7 +40,6 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitio
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.AfterClass;
@@ -100,7 +97,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
     // Rebalance should fail without creating the table
-    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new BaseConfiguration());
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
 
     // Create the table
@@ -117,7 +114,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
 
     // Rebalance should return NO_OP status
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
 
     // All servers should be assigned to the table
@@ -140,8 +137,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     }
 
     // Rebalance in dry-run mode
-    Configuration rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, true);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
 
@@ -170,8 +167,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         oldSegmentAssignment);
 
     // Rebalance with 3 min available replicas should fail as the table only 
have 3 replicas
-    rebalanceConfig = new BaseConfiguration();
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
 3);
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setMinAvailableReplicas(3);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
 
@@ -180,8 +177,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         oldSegmentAssignment);
 
     // Rebalance with 2 min available replicas should succeed
-    rebalanceConfig = new BaseConfiguration();
-    
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
 2);
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setMinAvailableReplicas(2);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
 
@@ -202,7 +199,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     _helixResourceManager.updateTableConfig(tableConfig);
 
     // No need to reassign instances because instances should be automatically 
assigned when updating the table config
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
 
     // There should be 3 replica-groups, each with 2 servers
@@ -249,15 +246,15 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
 
     // Without instances reassignment, the rebalance should return status 
NO_OP, and the existing instance partitions
     // should be used
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment);
     assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
 
     // With instances reassignment, the rebalance should return status DONE, 
the existing instance partitions should be
     // removed, and the default instance partitions should be used
-    rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, 
true);
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setReassignInstances(true);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
     assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
@@ -284,8 +281,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     }
 
     // Rebalance with downtime should succeed
-    rebalanceConfig = new BaseConfiguration();
-    rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, true);
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDowntime(true);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
 
@@ -350,7 +347,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
 
     TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
-    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new BaseConfiguration());
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     // Segment assignment should not change
     assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -366,7 +363,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, 
TIER_B_NAME, 3, 3, 0));
 
     // rebalance is NOOP and no change in assignment caused by new instances
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     // Segment assignment should not change
     assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -384,7 +381,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     _helixResourceManager.updateTableConfig(tableConfig);
 
     // rebalance should change assignment
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
 
     // check that segments have moved to tiers
@@ -441,7 +438,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
 
     TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
-    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new BaseConfiguration());
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     // Segment assignment should not change
     assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -453,7 +450,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     }
     _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, 
"replicaAssignment" + TIER_A_NAME, 6, 6, 0));
     // rebalance is NOOP and no change in assignment caused by new instances
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     // Segment assignment should not change
     assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -465,7 +462,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     _helixResourceManager.updateTableConfig(tableConfig);
 
     // rebalance should change assignment
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
 
     // check that segments have moved to tier a
@@ -487,7 +484,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
-    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
BaseConfiguration());
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig());
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
     
assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index 6bb7fbebb1..76189e9268 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -29,13 +29,13 @@ import 
org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.AfterClass;
@@ -97,10 +97,10 @@ public class TenantRebalancerTest extends ControllerTest {
         
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();
 
     // rebalance the tables on test tenant
-    TenantRebalanceContext context = new TenantRebalanceContext();
-    context.setTenantName(TENANT_NAME);
-    context.setVerboseResult(true);
-    TenantRebalanceResult result = tenantRebalancer.rebalance(context);
+    TenantRebalanceConfig config = new TenantRebalanceConfig();
+    config.setTenantName(TENANT_NAME);
+    config.setVerboseResult(true);
+    TenantRebalanceResult result = tenantRebalancer.rebalance(config);
     RebalanceResult rebalanceResult = 
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
     Map<String, Map<String, String>> rebalancedAssignment = 
rebalanceResult.getSegmentAssignment();
     // assignment should not change, with a NO_OP status as no now server is 
added to test tenant
@@ -108,8 +108,8 @@ public class TenantRebalancerTest extends ControllerTest {
     assertEquals(oldSegmentAssignment, rebalancedAssignment);
 
     // rebalance the tables on default tenant
-    context.setTenantName(DEFAULT_TENANT_NAME);
-    result = tenantRebalancer.rebalance(context);
+    config.setTenantName(DEFAULT_TENANT_NAME);
+    result = tenantRebalancer.rebalance(config);
     // rebalancing default tenant should distribute the segment of table A 
over 6 servers
     rebalanceResult = 
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
     InstancePartitions partitions = 
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
@@ -152,7 +152,8 @@ public class TenantRebalancerTest extends ControllerTest {
     if (controllerJobZKMetadata == null) {
       return null;
     }
-    return 
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+    return JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
         TenantRebalanceProgressStats.class);
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java
deleted file mode 100644
index 37cd4f3f25..0000000000
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.utils;
-
-/**
- * Constants for rebalance config properties
- */
-public class RebalanceConfigConstants {
-  private RebalanceConfigConstants() {
-  }
-
-  // Unique Id for rebalance
-  public static final String JOB_ID = "jobId";
-
-  // Progress of the Rebalance operartion
-  public static final String REBALANCE_PROGRESS_STATS = 
"REBALANCE_PROGRESS_STATS";
-
-  // Whether to rebalance table in dry-run mode
-  public static final String DRY_RUN = "dryRun";
-  public static final boolean DEFAULT_DRY_RUN = false;
-
-  // Whether to reassign instances before reassigning segments
-  public static final String REASSIGN_INSTANCES = "reassignInstances";
-  public static final boolean DEFAULT_REASSIGN_INSTANCES = false;
-
-  // Whether to reassign CONSUMING segments
-  public static final String INCLUDE_CONSUMING = "includeConsuming";
-  public static final boolean DEFAULT_INCLUDE_CONSUMING = false;
-
-  // Whether to rebalance table in bootstrap mode (regardless of minimum 
segment movement, reassign all segments in a
-  // round-robin fashion as if adding new segments to an empty table)
-  public static final String BOOTSTRAP = "bootstrap";
-  public static final boolean DEFAULT_BOOTSTRAP = false;
-
-  // Whether to allow downtime for the rebalance
-  public static final String DOWNTIME = "downtime";
-  public static final boolean DEFAULT_DOWNTIME = false;
-
-  // For no-downtime rebalance, minimum number of replicas to keep alive 
during rebalance, or maximum number of replicas
-  // allowed to be unavailable if value is negative
-  public static final String MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = 
"minReplicasToKeepUpForNoDowntime";
-  public static final int DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = 1;
-
-  // Whether to use best-efforts to rebalance (not fail the rebalance when the 
no-downtime contract cannot be achieved)
-  // When using best-efforts to rebalance, the following scenarios won't fail 
the rebalance (will log warnings instead):
-  // - Segment falls into ERROR state in ExternalView -> count ERROR state as 
good state
-  // - ExternalView has not converged within the maximum wait time -> continue 
to the next stage
-  public static final String BEST_EFFORTS = "bestEfforts";
-  public static final boolean DEFAULT_BEST_EFFORTS = false;
-
-  // The check on external view can be very costly when the table has very 
large ideal and external states, i.e. when
-  // having a huge number of segments. These two configs help reduce the cpu 
load on controllers, e.g. by doing the
-  // check less frequently and bail out sooner to rebalance at best effort if 
configured so.
-  public static final String EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 
"externalViewCheckIntervalInMs";
-  public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 
1_000L; // 1 second
-  public static final String EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = 
"externalViewStabilizationTimeoutInMs";
-  public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS = 
60 * 60_000L; // 1 hour
-  public static final String UPDATE_TARGET_TIER = "updateTargetTier";
-  public static final boolean DEFAULT_UPDATE_TARGET_TIER = false;
-}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 3a57293288..a4c85505dc 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -28,7 +28,6 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.spi.utils.StringUtil;
 
 
@@ -199,29 +198,26 @@ public class ControllerRequestURLBuilder {
   }
 
   public String forTableRebalance(String tableName, String tableType) {
-    return forTableRebalance(tableName, tableType, 
RebalanceConfigConstants.DEFAULT_DRY_RUN,
-        RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES, 
RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING,
-        RebalanceConfigConstants.DEFAULT_DOWNTIME,
-        
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
+    return forTableRebalance(tableName, tableType, false, false, false, false, 
1);
   }
 
   public String forTableRebalance(String tableName, String tableType, boolean 
dryRun, boolean reassignInstances,
       boolean includeConsuming, boolean downtime, int minAvailableReplicas) {
     StringBuilder stringBuilder =
         new StringBuilder(StringUtil.join("/", _baseUrl, "tables", tableName, 
"rebalance?type=" + tableType));
-    if (dryRun != RebalanceConfigConstants.DEFAULT_DRY_RUN) {
+    if (dryRun) {
       stringBuilder.append("&dryRun=").append(dryRun);
     }
-    if (reassignInstances != 
RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES) {
+    if (reassignInstances) {
       stringBuilder.append("&reassignInstances=").append(reassignInstances);
     }
-    if (includeConsuming != 
RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING) {
+    if (includeConsuming) {
       stringBuilder.append("&includeConsuming=").append(includeConsuming);
     }
-    if (downtime != RebalanceConfigConstants.DEFAULT_DOWNTIME) {
+    if (downtime) {
       stringBuilder.append("&downtime=").append(downtime);
     }
-    if (minAvailableReplicas != 
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME) {
+    if (minAvailableReplicas != 1) {
       
stringBuilder.append("&minAvailableReplicas=").append(minAvailableReplicas);
     }
     return stringBuilder.toString();
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index 6d1d58f743..0a68242703 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -19,39 +19,33 @@
 package org.apache.pinot.tools;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 
 
 /**
  * Helper class for pinot-admin tool's RebalanceTable command.
  */
 public class PinotTableRebalancer extends PinotZKChanger {
-  private final Configuration _rebalanceConfig = new BaseConfiguration();
+  private final RebalanceConfig _rebalanceConfig = new RebalanceConfig();
 
   public PinotTableRebalancer(String zkAddress, String clusterName, boolean 
dryRun, boolean reassignInstances,
       boolean includeConsuming, boolean bootstrap, boolean downtime, int 
minReplicasToKeepUpForNoDowntime,
       boolean bestEffort, long externalViewCheckIntervalInMs, long 
externalViewStabilizationTimeoutInMs) {
     super(zkAddress, clusterName);
-    _rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun);
-    _rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES, 
reassignInstances);
-    _rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING, 
includeConsuming);
-    _rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, 
bootstrap);
-    _rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime);
-    
_rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
-        minReplicasToKeepUpForNoDowntime);
-    _rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS, 
bestEffort);
-    
_rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
-        externalViewCheckIntervalInMs);
-    
_rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
-        externalViewStabilizationTimeoutInMs);
-    _rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID,
-        TableRebalancer.createUniqueRebalanceJobIdentifier());
+    _rebalanceConfig.setDryRun(dryRun);
+    _rebalanceConfig.setReassignInstances(reassignInstances);
+    _rebalanceConfig.setIncludeConsuming(includeConsuming);
+    _rebalanceConfig.setBootstrap(bootstrap);
+    _rebalanceConfig.setDowntime(downtime);
+    _rebalanceConfig.setMinAvailableReplicas(minReplicasToKeepUpForNoDowntime);
+    _rebalanceConfig.setBestEfforts(bestEffort);
+    
_rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
+    
_rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
+    
_rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier());
   }
 
   public RebalanceResult rebalance(String tableNameWithType) {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
index 258c66cea2..5b4c3260d4 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
 import org.apache.pinot.tools.Command;
 import org.apache.pinot.tools.PinotTableRebalancer;
 import org.slf4j.Logger;
@@ -80,12 +80,12 @@ public class RebalanceTableCommand extends 
AbstractBaseAdminCommand implements C
 
   @CommandLine.Option(names = {"-externalViewCheckIntervalInMs"},
       description = "How often to check if external view converges with ideal 
view")
-  private long _externalViewCheckIntervalInMs = 
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS;
+  private long _externalViewCheckIntervalInMs = 
RebalanceConfig.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS;
 
   @CommandLine.Option(names = {"-externalViewStabilizationTimeoutInMs"},
       description = "How long to wait till external view converges with ideal 
view")
   private long _externalViewStabilizationTimeoutInMs =
-      
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS;
+      RebalanceConfig.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS;
 
   @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, 
description = "Print this message")
   private boolean _help = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to