This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c96221f697 add RebalanceConfig class to replace use of
BaseConfiguration for rebalance configs (#11730)
c96221f697 is described below
commit c96221f697ab9347a1e5dbf145a9d6d660c53d62
Author: Xiaobing <[email protected]>
AuthorDate: Wed Oct 4 11:51:51 2023 -0700
add RebalanceConfig class to replace use of BaseConfiguration for rebalance
configs (#11730)
* add RebalanceConfig class to replace use of Configuration for rebalance
configs
* remove RebalanceConfigConstants
---
.../apache/pinot/controller/ControllerConf.java | 6 +-
.../api/resources/PinotTableRestletResource.java | 46 +++--
.../api/resources/PinotTenantRestletResource.java | 16 +-
.../helix/core/PinotHelixResourceManager.java | 14 +-
.../segment/OfflineSegmentAssignment.java | 9 +-
.../segment/RealtimeSegmentAssignment.java | 12 +-
.../core/assignment/segment/SegmentAssignment.java | 4 +-
.../helix/core/rebalance/RebalanceConfig.java | 193 +++++++++++++++++++++
.../helix/core/rebalance/RebalanceContext.java | 138 ---------------
...tRebalancer.java => RebalanceJobConstants.java} | 9 +-
.../helix/core/rebalance/TableRebalancer.java | 37 ++--
.../rebalance/ZkBasedTableRebalanceObserver.java | 3 +-
.../rebalance/tenant/DefaultTenantRebalancer.java | 79 ++++-----
...anceContext.java => TenantRebalanceConfig.java} | 6 +-
.../core/rebalance/tenant/TenantRebalancer.java | 3 +-
.../tenant/ZkBasedTenantRebalanceObserver.java | 4 +-
.../helix/core/relocation/SegmentRelocator.java | 19 +-
...NonReplicaGroupTieredSegmentAssignmentTest.java | 20 ++-
...altimeNonReplicaGroupSegmentAssignmentTest.java | 18 +-
...NonReplicaGroupTieredSegmentAssignmentTest.java | 20 +--
.../RealtimeReplicaGroupSegmentAssignmentTest.java | 22 ++-
.../AllServersSegmentAssignmentStrategyTest.java | 3 +-
.../BalancedNumSegmentAssignmentStrategyTest.java | 12 +-
.../ReplicaGroupSegmentAssignmentStrategyTest.java | 48 +++--
.../TableRebalancerClusterStatelessTest.java | 45 +++--
.../rebalance/tenant/TenantRebalancerTest.java | 17 +-
.../pinot/spi/utils/RebalanceConfigConstants.java | 76 --------
.../utils/builder/ControllerRequestURLBuilder.java | 16 +-
.../apache/pinot/tools/PinotTableRebalancer.java | 30 ++--
.../tools/admin/command/RebalanceTableCommand.java | 6 +-
30 files changed, 419 insertions(+), 512 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index fa6853191f..cb6ee99c4f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -30,10 +30,10 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import static
org.apache.pinot.spi.utils.CommonConstants.Controller.CONFIG_OF_CONTROLLER_METRICS_PREFIX;
@@ -962,12 +962,12 @@ public class ControllerConf extends PinotConfiguration {
public long getSegmentRelocatorExternalViewCheckIntervalInMs() {
return
getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
- RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS);
+ RebalanceConfig.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS);
}
public long getSegmentRelocatorExternalViewStabilizationTimeoutInMs() {
return
getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
-
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS);
+ RebalanceConfig.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS);
}
public boolean isSegmentRelocatorRebalanceTablesSequentially() {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index b4dda6a56a..9cc7020a1d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -64,8 +64,6 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
@@ -94,6 +92,8 @@ import
org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
@@ -114,7 +114,6 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.zookeeper.data.Stat;
@@ -621,20 +620,18 @@ public class PinotTableRestletResource {
String tableNameWithType = constructTableNameWithType(tableName,
tableTypeStr);
- Configuration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun);
- rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES,
reassignInstances);
- rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING,
includeConsuming);
- rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP, bootstrap);
- rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime);
-
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
minAvailableReplicas);
- rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS,
bestEfforts);
-
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
- externalViewCheckIntervalInMs);
-
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
- externalViewStabilizationTimeoutInMs);
- rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER,
updateTargetTier);
- rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID,
TableRebalancer.createUniqueRebalanceJobIdentifier());
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(dryRun);
+ rebalanceConfig.setReassignInstances(reassignInstances);
+ rebalanceConfig.setIncludeConsuming(includeConsuming);
+ rebalanceConfig.setBootstrap(bootstrap);
+ rebalanceConfig.setDowntime(downtime);
+ rebalanceConfig.setMinAvailableReplicas(minAvailableReplicas);
+ rebalanceConfig.setBestEfforts(bestEfforts);
+
rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
+
rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
+ rebalanceConfig.setUpdateTargetTier(updateTargetTier);
+
rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier());
try {
if (dryRun || downtime) {
@@ -642,13 +639,13 @@ public class PinotTableRestletResource {
return _pinotHelixResourceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, false);
} else {
// Make a dry-run first to get the target assignment
- rebalanceConfig.setProperty(RebalanceConfigConstants.DRY_RUN, true);
+ rebalanceConfig.setDryRun(true);
RebalanceResult dryRunResult =
_pinotHelixResourceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, false);
if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
// If dry-run succeeded, run rebalance asynchronously
- rebalanceConfig.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+ rebalanceConfig.setDryRun(false);
_executorService.submit(() -> {
try {
_pinotHelixResourceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, true);
@@ -744,13 +741,12 @@ public class PinotTableRestletResource {
throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + jobId,
Response.Status.NOT_FOUND);
}
- TableRebalanceProgressStats tableRebalanceProgressStats =
-
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
- TableRebalanceProgressStats.class);
+ TableRebalanceProgressStats tableRebalanceProgressStats =
JsonUtils.stringToObject(
+
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
+ TableRebalanceProgressStats.class);
long timeSinceStartInSecs = 0L;
- if
(!tableRebalanceProgressStats.getStatus().equals(RebalanceResult.Status.DONE)) {
- timeSinceStartInSecs =
- (System.currentTimeMillis() -
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+ if
(!RebalanceResult.Status.DONE.toString().equals(tableRebalanceProgressStats.getStatus()))
{
+ timeSinceStartInSecs = (System.currentTimeMillis() -
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
}
ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new
ServerRebalanceJobStatusResponse();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index f160bba80d..3aa5da48e9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -58,7 +58,8 @@ import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
-import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceContext;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceConfig;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceProgressStats;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
@@ -70,7 +71,6 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -584,9 +584,9 @@ public class PinotTenantRestletResource {
@ApiOperation(value = "Rebalances all the tables that are part of the
tenant")
public TenantRebalanceResult rebalance(
@ApiParam(value = "Name of the tenant whose table are to be rebalanced",
required = true)
- @PathParam("tenantName") String tenantName, @ApiParam(required = true)
TenantRebalanceContext context) {
- context.setTenantName(tenantName);
- return _tenantRebalancer.rebalance(context);
+ @PathParam("tenantName") String tenantName, @ApiParam(required = true)
TenantRebalanceConfig config) {
+ config.setTenantName(tenantName);
+ return _tenantRebalancer.rebalance(config);
}
@GET
@@ -606,9 +606,9 @@ public class PinotTenantRestletResource {
throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + jobId,
Response.Status.NOT_FOUND);
}
- TenantRebalanceProgressStats tenantRebalanceProgressStats =
-
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
- TenantRebalanceProgressStats.class);
+ TenantRebalanceProgressStats tenantRebalanceProgressStats =
JsonUtils.stringToObject(
+
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
+ TenantRebalanceProgressStats.class);
long timeSinceStartInSecs =
tenantRebalanceProgressStats.getTimeToFinishInSeconds();
if (tenantRebalanceProgressStats.getCompletionStatusMsg() == null) {
timeSinceStartInSecs =
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 7ecd25aab1..c9bf0c52af 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -56,7 +56,6 @@ import javax.ws.rs.ClientErrorException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.Response;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
@@ -144,6 +143,7 @@ import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
import org.apache.pinot.controller.helix.core.lineage.LineageManager;
import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
@@ -175,7 +175,6 @@ import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateM
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -3081,21 +3080,20 @@ public class PinotHelixResourceManager {
* Entry point for table Rebalacing.
* @param tableNameWithType
* @param rebalanceConfig
- * @param trackRebalanceProgress - Do we want to track rebalance progress
stats
+ * @param trackRebalanceProgress whether to track rebalance progress stats
* @return RebalanceResult
* @throws TableNotFoundException
*/
- public RebalanceResult rebalanceTable(String tableNameWithType,
Configuration rebalanceConfig,
+ public RebalanceResult rebalanceTable(String tableNameWithType,
RebalanceConfig rebalanceConfig,
boolean trackRebalanceProgress)
throws TableNotFoundException {
TableConfig tableConfig = getTableConfig(tableNameWithType);
if (tableConfig == null) {
throw new TableNotFoundException("Failed to find table config for table:
" + tableNameWithType);
}
- String rebalanceJobId =
rebalanceConfig.getString(RebalanceConfigConstants.JOB_ID);
- Preconditions.checkState(rebalanceJobId != null, "RebalanceId not
populated in the rebalanceConfig ");
- if (rebalanceConfig.getBoolean(RebalanceConfigConstants.UPDATE_TARGET_TIER,
- RebalanceConfigConstants.DEFAULT_UPDATE_TARGET_TIER)) {
+ String rebalanceJobId = rebalanceConfig.getJobId();
+ Preconditions.checkState(rebalanceJobId != null, "RebalanceId not
populated in the rebalanceConfig");
+ if (rebalanceConfig.isUpdateTargetTier()) {
updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
}
ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 36f784515a..05eb7d4048 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -23,15 +23,14 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.AllServersSegmentAssignmentStrategy;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
/**
@@ -61,7 +60,7 @@ public class OfflineSegmentAssignment extends
BaseSegmentAssignment {
@Override
public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
- @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
Configuration config) {
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
InstancePartitions offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
Preconditions
.checkState(offlineInstancePartitions != null, "Failed to find OFFLINE
instance partitions for table: %s",
@@ -78,9 +77,7 @@ public class OfflineSegmentAssignment extends
BaseSegmentAssignment {
return segmentAssignmentStrategy
.reassignSegments(currentAssignment, offlineInstancePartitions,
InstancePartitionsType.OFFLINE);
}
- boolean bootstrap =
- config.getBoolean(RebalanceConfigConstants.BOOTSTRAP,
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
-
+ boolean bootstrap = config.isBootstrap();
// Rebalance tiers first
Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String,
String>>> pair =
rebalanceTiers(currentAssignment, sortedTiers,
tierInstancePartitionsMap, bootstrap,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index db8b375a4c..434dbec4e3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -25,15 +25,14 @@ import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategy;
import
org.apache.pinot.controller.helix.core.assignment.segment.strategy.SegmentAssignmentStrategyFactory;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
/**
@@ -174,17 +173,14 @@ public class RealtimeSegmentAssignment extends
BaseSegmentAssignment {
@Override
public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
- @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
Configuration config) {
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config) {
InstancePartitions completedInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
InstancePartitions consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
Preconditions
.checkState(consumingInstancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
_tableNameWithType);
- boolean includeConsuming = config
- .getBoolean(RebalanceConfigConstants.INCLUDE_CONSUMING,
RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING);
- boolean bootstrap =
- config.getBoolean(RebalanceConfigConstants.BOOTSTRAP,
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
-
+ boolean includeConsuming = config.isIncludeConsuming();
+ boolean bootstrap = config.isBootstrap();
// Rebalance tiers first
Pair<List<Map<String, Map<String, String>>>, Map<String, Map<String,
String>>> pair =
rebalanceTiers(currentAssignment, sortedTiers,
tierInstancePartitionsMap, bootstrap,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
index c2f8f5d61b..12ec0b5545 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignment.java
@@ -21,11 +21,11 @@ package
org.apache.pinot.controller.helix.core.assignment.segment;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -65,5 +65,5 @@ public interface SegmentAssignment {
*/
Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String,
String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
@Nullable List<Tier> sortedTiers,
- @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
Configuration config);
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap,
RebalanceConfig config);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
new file mode 100644
index 0000000000..b2a307f2e5
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+
+@ApiModel
+public class RebalanceConfig {
+ public static final int DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = 1;
+ public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS = 1000L;
// 1 second
+ public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS =
3600000L; // 1 hour
+
+ // Whether to rebalance table in dry-run mode
+ @JsonProperty("dryRun")
+ @ApiModelProperty(example = "false")
+ private boolean _dryRun = false;
+
+ // Whether to reassign instances before reassigning segments
+ @JsonProperty("reassignInstances")
+ @ApiModelProperty(example = "false")
+ private boolean _reassignInstances = false;
+
+ // Whether to reassign CONSUMING segments
+ @JsonProperty("includeConsuming")
+ @ApiModelProperty(example = "false")
+ private boolean _includeConsuming = false;
+
+ // Whether to rebalance table in bootstrap mode (regardless of minimum
segment movement, reassign all segments in a
+ // round-robin fashion as if adding new segments to an empty table)
+ @JsonProperty("bootstrap")
+ @ApiModelProperty(example = "false")
+ private boolean _bootstrap = false;
+
+ // Whether to allow downtime for the rebalance
+ @JsonProperty("downtime")
+ @ApiModelProperty(example = "false")
+ private boolean _downtime = false;
+
+ // For no-downtime rebalance, minimum number of replicas to keep alive
during rebalance, or maximum number of replicas
+ // allowed to be unavailable if value is negative
+ @JsonProperty("minAvailableReplicas")
+ @ApiModelProperty(example = "1")
+ private int _minAvailableReplicas =
DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME;
+
+ // Whether to use best-efforts to rebalance (not fail the rebalance when the
no-downtime contract cannot be achieved)
+ // When using best-efforts to rebalance, the following scenarios won't fail
the rebalance (will log warnings instead):
+ // - Segment falls into ERROR state in ExternalView -> count ERROR state as
good state
+ // - ExternalView has not converged within the maximum wait time -> continue
to the next stage
+ @JsonProperty("bestEfforts")
+ @ApiModelProperty(example = "false")
+ private boolean _bestEfforts = false;
+
+ // The check on external view can be very costly when the table has very
large ideal and external states, i.e. when
+ // having a huge number of segments. These two configs help reduce the cpu
load on controllers, e.g. by doing the
+ // check less frequently and bail out sooner to rebalance at best effort if
configured so.
+ @JsonProperty("externalViewCheckIntervalInMs")
+ @ApiModelProperty(example = "1000")
+ private long _externalViewCheckIntervalInMs =
DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS;
+
+ @JsonProperty("externalViewStabilizationTimeoutInMs")
+ @ApiModelProperty(example = "3600000")
+ private long _externalViewStabilizationTimeoutInMs =
DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS;
+
+ @JsonProperty("updateTargetTier")
+ @ApiModelProperty(example = "false")
+ private boolean _updateTargetTier = false;
+
+ @JsonProperty("jobId")
+ private String _jobId = null;
+
+ public boolean isDryRun() {
+ return _dryRun;
+ }
+
+ public void setDryRun(boolean dryRun) {
+ _dryRun = dryRun;
+ }
+
+ public boolean isReassignInstances() {
+ return _reassignInstances;
+ }
+
+ public void setReassignInstances(boolean reassignInstances) {
+ _reassignInstances = reassignInstances;
+ }
+
+ public boolean isIncludeConsuming() {
+ return _includeConsuming;
+ }
+
+ public void setIncludeConsuming(boolean includeConsuming) {
+ _includeConsuming = includeConsuming;
+ }
+
+ public boolean isBootstrap() {
+ return _bootstrap;
+ }
+
+ public void setBootstrap(boolean bootstrap) {
+ _bootstrap = bootstrap;
+ }
+
+ public boolean isDowntime() {
+ return _downtime;
+ }
+
+ public void setDowntime(boolean downtime) {
+ _downtime = downtime;
+ }
+
+ public int getMinAvailableReplicas() {
+ return _minAvailableReplicas;
+ }
+
+ public void setMinAvailableReplicas(int minAvailableReplicas) {
+ _minAvailableReplicas = minAvailableReplicas;
+ }
+
+ public boolean isBestEfforts() {
+ return _bestEfforts;
+ }
+
+ public void setBestEfforts(boolean bestEfforts) {
+ _bestEfforts = bestEfforts;
+ }
+
+ public long getExternalViewCheckIntervalInMs() {
+ return _externalViewCheckIntervalInMs;
+ }
+
+ public void setExternalViewCheckIntervalInMs(long
externalViewCheckIntervalInMs) {
+ _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs;
+ }
+
+ public long getExternalViewStabilizationTimeoutInMs() {
+ return _externalViewStabilizationTimeoutInMs;
+ }
+
+ public void setExternalViewStabilizationTimeoutInMs(long
externalViewStabilizationTimeoutInMs) {
+ _externalViewStabilizationTimeoutInMs =
externalViewStabilizationTimeoutInMs;
+ }
+
+ public boolean isUpdateTargetTier() {
+ return _updateTargetTier;
+ }
+
+ public void setUpdateTargetTier(boolean updateTargetTier) {
+ _updateTargetTier = updateTargetTier;
+ }
+
+ public String getJobId() {
+ return _jobId;
+ }
+
+ public void setJobId(String jobId) {
+ _jobId = jobId;
+ }
+
+ public static RebalanceConfig copy(RebalanceConfig cfg) {
+ RebalanceConfig rc = new RebalanceConfig();
+ rc._dryRun = cfg._dryRun;
+ rc._reassignInstances = cfg._reassignInstances;
+ rc._includeConsuming = cfg._includeConsuming;
+ rc._bootstrap = cfg._bootstrap;
+ rc._downtime = cfg._downtime;
+ rc._minAvailableReplicas = cfg._minAvailableReplicas;
+ rc._bestEfforts = cfg._bestEfforts;
+ rc._externalViewCheckIntervalInMs = cfg._externalViewCheckIntervalInMs;
+ rc._externalViewStabilizationTimeoutInMs =
cfg._externalViewStabilizationTimeoutInMs;
+ rc._updateTargetTier = cfg._updateTargetTier;
+ rc._jobId = cfg._jobId;
+ return rc;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
deleted file mode 100644
index cd6e06c399..0000000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceContext.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.rebalance;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-
-@ApiModel
-public class RebalanceContext {
- // TODO : simplify the rebalance configs wherever possible
- @JsonProperty("dryRun")
- @ApiModelProperty(example = "false")
- private Boolean _dryRun = false;
- @JsonProperty("reassignInstances")
- @ApiModelProperty(example = "false")
- private Boolean _reassignInstances = false;
- @JsonProperty("includeConsuming")
- @ApiModelProperty(example = "false")
- private Boolean _includeConsuming = false;
- @JsonProperty("bootstrap")
- @ApiModelProperty(example = "false")
- private Boolean _bootstrap = false;
- @JsonProperty("downtime")
- @ApiModelProperty(example = "false")
- private Boolean _downtime = false;
- @JsonProperty("minAvailableReplicas")
- @ApiModelProperty(example = "1")
- private Integer _minAvailableReplicas = 1;
- @JsonProperty("bestEfforts")
- @ApiModelProperty(example = "false")
- private Boolean _bestEfforts = false;
- @JsonProperty("externalViewCheckIntervalInMs")
- @ApiModelProperty(example = "1000")
- private Long _externalViewCheckIntervalInMs = 1000L;
- @JsonProperty("externalViewStabilizationTimeoutInMs")
- @ApiModelProperty(example = "3600000")
- private Long _externalViewStabilizationTimeoutInMs = 3600000L;
- @JsonProperty("updateTargetTier")
- @ApiModelProperty(example = "false")
- private Boolean _updateTargetTier = false;
-
- public Boolean isDryRun() {
- return _dryRun;
- }
-
- public void setDryRun(Boolean dryRun) {
- _dryRun = dryRun;
- }
-
- public Boolean isReassignInstances() {
- return _reassignInstances;
- }
-
- public void setReassignInstances(Boolean reassignInstances) {
- _reassignInstances = reassignInstances;
- }
-
- public Boolean isIncludeConsuming() {
- return _includeConsuming;
- }
-
- public void setIncludeConsuming(Boolean includeConsuming) {
- _includeConsuming = includeConsuming;
- }
-
- public Boolean isBootstrap() {
- return _bootstrap;
- }
-
- public void setBootstrap(Boolean bootstrap) {
- _bootstrap = bootstrap;
- }
-
- public Boolean isDowntime() {
- return _downtime;
- }
-
- public void setDowntime(Boolean downtime) {
- _downtime = downtime;
- }
-
- public Integer getMinAvailableReplicas() {
- return _minAvailableReplicas;
- }
-
- public void setMinAvailableReplicas(Integer minAvailableReplicas) {
- _minAvailableReplicas = minAvailableReplicas;
- }
-
- public Boolean isBestEfforts() {
- return _bestEfforts;
- }
-
- public void setBestEfforts(Boolean bestEfforts) {
- _bestEfforts = bestEfforts;
- }
-
- public Long getExternalViewCheckIntervalInMs() {
- return _externalViewCheckIntervalInMs;
- }
-
- public void setExternalViewCheckIntervalInMs(Long
externalViewCheckIntervalInMs) {
- _externalViewCheckIntervalInMs = externalViewCheckIntervalInMs;
- }
-
- public Long getExternalViewStabilizationTimeoutInMs() {
- return _externalViewStabilizationTimeoutInMs;
- }
-
- public void setExternalViewStabilizationTimeoutInMs(Long
externalViewStabilizationTimeoutInMs) {
- _externalViewStabilizationTimeoutInMs =
externalViewStabilizationTimeoutInMs;
- }
-
- public Boolean isUpdateTargetTier() {
- return _updateTargetTier;
- }
-
- public void setUpdateTargetTier(Boolean updateTargetTier) {
- _updateTargetTier = updateTargetTier;
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
similarity index 74%
copy from
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
copy to
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
index 53df7824d5..8070d44972 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceJobConstants.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.controller.helix.core.rebalance.tenant;
+package org.apache.pinot.controller.helix.core.rebalance;
+public class RebalanceJobConstants {
+ private RebalanceJobConstants() {
+ }
-public interface TenantRebalancer {
- TenantRebalanceResult rebalance(TenantRebalanceContext context);
+ // Progress status of the rebalance operartion
+ public static final String JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS =
"REBALANCE_PROGRESS_STATS";
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 1a3b7e12f6..74e0f2f5a6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeoutException;
import java.util.function.ToIntFunction;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.helix.AccessOption;
@@ -64,7 +63,6 @@ import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,40 +136,27 @@ public class TableRebalancer {
return UUID.randomUUID().toString();
}
- public RebalanceResult rebalance(TableConfig tableConfig, Configuration
rebalanceConfig) {
+ public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig
rebalanceConfig) {
long startTimeMs = System.currentTimeMillis();
String tableNameWithType = tableConfig.getTableName();
- String rebalanceJobId =
rebalanceConfig.getString(RebalanceConfigConstants.JOB_ID);
+ String rebalanceJobId = rebalanceConfig.getJobId();
if (rebalanceJobId == null) {
// If not passed along, create one.
// TODO - Add rebalanceJobId to all log messages for easy tracking.
rebalanceJobId = createUniqueRebalanceJobIdentifier();
}
-
- boolean dryRun =
- rebalanceConfig.getBoolean(RebalanceConfigConstants.DRY_RUN,
RebalanceConfigConstants.DEFAULT_DRY_RUN);
- boolean reassignInstances =
rebalanceConfig.getBoolean(RebalanceConfigConstants.REASSIGN_INSTANCES,
- RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES);
- boolean includeConsuming =
rebalanceConfig.getBoolean(RebalanceConfigConstants.INCLUDE_CONSUMING,
- RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING);
- boolean bootstrap =
- rebalanceConfig.getBoolean(RebalanceConfigConstants.BOOTSTRAP,
RebalanceConfigConstants.DEFAULT_BOOTSTRAP);
- boolean downtime =
- rebalanceConfig.getBoolean(RebalanceConfigConstants.DOWNTIME,
RebalanceConfigConstants.DEFAULT_DOWNTIME);
- int minReplicasToKeepUpForNoDowntime =
-
rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
-
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
+ boolean dryRun = rebalanceConfig.isDryRun();
+ boolean reassignInstances = rebalanceConfig.isReassignInstances();
+ boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
+ boolean bootstrap = rebalanceConfig.isBootstrap();
+ boolean downtime = rebalanceConfig.isDowntime();
+ int minReplicasToKeepUpForNoDowntime =
rebalanceConfig.getMinAvailableReplicas();
+ boolean bestEfforts = rebalanceConfig.isBestEfforts();
+ long externalViewCheckIntervalInMs =
rebalanceConfig.getExternalViewCheckIntervalInMs();
+ long externalViewStabilizationTimeoutInMs =
rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
&&
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
tableConfig.getRoutingConfig().getInstanceSelectorType());
- boolean bestEfforts =
rebalanceConfig.getBoolean(RebalanceConfigConstants.BEST_EFFORTS,
- RebalanceConfigConstants.DEFAULT_BEST_EFFORTS);
- long externalViewCheckIntervalInMs =
-
rebalanceConfig.getLong(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
-
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS);
- long externalViewStabilizationTimeoutInMs =
-
rebalanceConfig.getLong(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
-
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS);
LOGGER.info(
"Start rebalancing table: {} with dryRun: {}, reassignInstances: {},
includeConsuming: {}, bootstrap: {}, "
+ "downtime: {}, minReplicasToKeepUpForNoDowntime: {},
enableStrictReplicaGroup: {}, bestEfforts: {}, "
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 5db4bebf58..1978df35f3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -27,7 +27,6 @@ import
org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,7 +133,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE.name());
try {
- jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS,
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(_tableRebalanceProgressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _rebalanceJobId, e);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index d0278c15e3..04d18230ae 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -29,13 +29,11 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,21 +48,22 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
}
@Override
- public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
+ public TenantRebalanceResult rebalance(TenantRebalanceConfig config) {
Map<String, RebalanceResult> rebalanceResult = new HashMap<>();
- Set<String> tables = getTenantTables(context.getTenantName());
+ Set<String> tables = getTenantTables(config.getTenantName());
tables.forEach(table -> {
try {
- Configuration config = extractRebalanceConfig(context);
- config.setProperty(RebalanceConfigConstants.DRY_RUN, true);
- rebalanceResult.put(table,
_pinotHelixResourceManager.rebalanceTable(table, config, false));
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setJobId(createUniqueRebalanceJobIdentifier());
+ rebalanceConfig.setDryRun(true);
+ rebalanceResult.put(table,
_pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig, false));
} catch (TableNotFoundException exception) {
rebalanceResult.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
null, null, null));
}
});
- if (context.isDryRun() || context.isDowntime()) {
- return new TenantRebalanceResult(null, rebalanceResult,
context.isVerboseResult());
+ if (config.isDryRun() || config.isDowntime()) {
+ return new TenantRebalanceResult(null, rebalanceResult,
config.isVerboseResult());
} else {
for (String table : rebalanceResult.keySet()) {
RebalanceResult result = rebalanceResult.get(table);
@@ -77,25 +76,25 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
}
String tenantRebalanceJobId = createUniqueRebalanceJobIdentifier();
- TenantRebalanceObserver observer = new
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(),
+ TenantRebalanceObserver observer = new
ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, config.getTenantName(),
tables, _pinotHelixResourceManager);
observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null,
null);
final Deque<String> sequentialQueue = new LinkedList<>();
final Deque<String> parallelQueue = new ConcurrentLinkedDeque<>();
// ensure atleast 1 thread is created to run the sequential table
rebalance operations
- int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
- Set<String> dimTables = getDimensionalTables(context.getTenantName());
+ int parallelism = Math.max(config.getDegreeOfParallelism(), 1);
+ Set<String> dimTables = getDimensionalTables(config.getTenantName());
AtomicInteger activeThreads = new AtomicInteger(parallelism);
try {
if (parallelism > 1) {
Set<String> parallelTables;
- if (!context.getParallelWhitelist().isEmpty()) {
- parallelTables = new HashSet<>(context.getParallelWhitelist());
+ if (!config.getParallelWhitelist().isEmpty()) {
+ parallelTables = new HashSet<>(config.getParallelWhitelist());
} else {
parallelTables = new HashSet<>(tables);
}
- if (!context.getParallelBlacklist().isEmpty()) {
- parallelTables = Sets.difference(parallelTables,
context.getParallelBlacklist());
+ if (!config.getParallelBlacklist().isEmpty()) {
+ parallelTables = Sets.difference(parallelTables,
config.getParallelBlacklist());
}
parallelTables.forEach(table -> {
if (dimTables.contains(table)) {
@@ -131,33 +130,33 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
if (table == null) {
break;
}
- Configuration config = extractRebalanceConfig(context);
- config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
- config.setProperty(RebalanceConfigConstants.JOB_ID,
rebalanceResult.get(table).getJobId());
- rebalanceTable(table, config, observer);
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setJobId(rebalanceResult.get(table).getJobId());
+ rebalanceTable(table, rebalanceConfig, observer);
}
// Last parallel thread to finish the table rebalance job will pick
up the
// sequential table rebalance execution
if (activeThreads.decrementAndGet() == 0) {
- Configuration config = extractRebalanceConfig(context);
- config.setProperty(RebalanceConfigConstants.DRY_RUN, false);
+ RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
+ rebalanceConfig.setDryRun(false);
while (true) {
String table = sequentialQueue.pollFirst();
if (table == null) {
break;
}
- config.setProperty(RebalanceConfigConstants.JOB_ID,
rebalanceResult.get(table).getJobId());
- rebalanceTable(table, config, observer);
+ rebalanceConfig.setJobId(rebalanceResult.get(table).getJobId());
+ rebalanceTable(table, rebalanceConfig, observer);
}
- observer.onSuccess(String.format("Successfully rebalanced tenant
%s.", context.getTenantName()));
+ observer.onSuccess(String.format("Successfully rebalanced tenant
%s.", config.getTenantName()));
}
});
}
} catch (Exception exception) {
- observer.onError(String.format("Failed to rebalance the tenant %s.
Cause: %s", context.getTenantName(),
+ observer.onError(String.format("Failed to rebalance the tenant %s.
Cause: %s", config.getTenantName(),
exception.getMessage()));
}
- return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult,
context.isVerboseResult());
+ return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult,
config.isVerboseResult());
}
private Set<String> getDimensionalTables(String tenantName) {
@@ -175,25 +174,6 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
return dimTables;
}
- private Configuration extractRebalanceConfig(TenantRebalanceContext context)
{
- Configuration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN,
context.isDryRun());
- rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES,
context.isReassignInstances());
- rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING,
context.isIncludeConsuming());
- rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP,
context.isBootstrap());
- rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME,
context.isDowntime());
-
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
- context.getMinAvailableReplicas());
- rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS,
context.isBestEfforts());
-
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
- context.getExternalViewCheckIntervalInMs());
-
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
- context.getExternalViewStabilizationTimeoutInMs());
- rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER,
context.isUpdateTargetTier());
- rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID,
createUniqueRebalanceJobIdentifier());
- return rebalanceConfig;
- }
-
private String createUniqueRebalanceJobIdentifier() {
return UUID.randomUUID().toString();
}
@@ -214,11 +194,10 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
return tables;
}
- private void rebalanceTable(String tableName, Configuration config,
+ private void rebalanceTable(String tableName, RebalanceConfig config,
TenantRebalanceObserver observer) {
try {
-
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER,
tableName,
- config.getString(RebalanceConfigConstants.JOB_ID));
+
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER,
tableName, config.getJobId());
RebalanceResult result =
_pinotHelixResourceManager.rebalanceTable(tableName, config, true);
if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java
similarity index 95%
rename from
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
rename to
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java
index 5e76dcc014..3ca6bd777e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceConfig.java
@@ -23,15 +23,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModelProperty;
import java.util.HashSet;
import java.util.Set;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceContext;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
-public class TenantRebalanceContext extends RebalanceContext {
+public class TenantRebalanceConfig extends RebalanceConfig {
@JsonIgnore
private String _tenantName;
@JsonProperty("degreeOfParallelism")
@ApiModelProperty(example = "1")
- private Integer _degreeOfParallelism = 1;
+ private int _degreeOfParallelism = 1;
@JsonProperty("parallelWhitelist")
private Set<String> _parallelWhitelist = new HashSet<>();
@JsonProperty("parallelBlacklist")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
index 53df7824d5..4b28b30569 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.controller.helix.core.rebalance.tenant;
-
public interface TenantRebalancer {
- TenantRebalanceResult rebalance(TenantRebalanceContext context);
+ TenantRebalanceResult rebalance(TenantRebalanceConfig config);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
index 7521caa3f3..6c26985539 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/ZkBasedTenantRebalanceObserver.java
@@ -28,9 +28,9 @@ import java.util.Set;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +103,7 @@ public class ZkBasedTenantRebalanceObserver implements
TenantRebalanceObserver {
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TENANT_REBALANCE.name());
try {
- jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS,
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
JsonUtils.objectToString(_progressStats));
} catch (JsonProcessingException e) {
LOGGER.error("Error serialising rebalance stats to JSON for persisting
to ZK {}", _jobId, e);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index ad9cd1f0f5..0a77f9ff1f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -30,8 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
@@ -44,12 +42,12 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,15 +164,12 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
}
// Allow at most one replica unavailable during relocation
- Configuration rebalanceConfig = new BaseConfiguration();
-
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
-1);
-
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
- _externalViewCheckIntervalInMs);
-
rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
- _externalViewStabilizationTimeoutInMs);
- rebalanceConfig.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER,
- TierConfigUtils.shouldRelocateToTiers(tableConfig));
- rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID,
TableRebalancer.createUniqueRebalanceJobIdentifier());
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setMinAvailableReplicas(-1);
+
rebalanceConfig.setExternalViewCheckIntervalInMs(_externalViewCheckIntervalInMs);
+
rebalanceConfig.setExternalViewStabilizationTimeoutInMs(_externalViewStabilizationTimeoutInMs);
+
rebalanceConfig.setUpdateTargetTier(TierConfigUtils.shouldRelocateToTiers(tableConfig));
+
rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier());
try {
// Relocating segments to new tiers needs two sequential actions: table
rebalance and local tier migration.
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
index af75b05c23..c9a42536d3 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -25,20 +25,19 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
+import java.util.TreeSet;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.tier.PinotServerTierStorage;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.tier.TierSegmentSelector;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -165,7 +164,7 @@ public class
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
// On rebalancing, segments move to tiers
Map<String, Map<String, String>> newAssignment =
_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, _sortedTiers,
- _tierInstancePartitionsMap, new BaseConfiguration());
+ _tierInstancePartitionsMap, new RebalanceConfig());
assertEquals(newAssignment.size(), NUM_SEGMENTS);
// segments 0-49 remain unchanged
@@ -205,7 +204,7 @@ public class
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
// rebalance without tierInstancePartitions resets the assignment
Map<String, Map<String, String>> resetAssignment =
- _segmentAssignment.rebalanceTable(newAssignment,
_instancePartitionsMap, null, null, new BaseConfiguration());
+ _segmentAssignment.rebalanceTable(newAssignment,
_instancePartitionsMap, null, null, new RebalanceConfig());
for (String segment : SEGMENTS) {
Assert.assertTrue(INSTANCES.containsAll(resetAssignment.get(segment).keySet()));
}
@@ -214,7 +213,10 @@ public class
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
@Test
public void testBootstrapTable() {
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
- for (String segmentName : SEGMENTS) {
+ // The list of segments are segment_0, segment_1, ... segment_10, ...; but
TreeMap sorts segments as segment_0,
+ // segment_1, segment_10, ... segment_2, ... So to make bootstrap generate
same assignment as assigning each
+ // segment separately, we need to process the segments in the same order.
+ for (String segmentName : new TreeSet<>(SEGMENTS)) {
List<String> instancesAssigned =
_segmentAssignment.assignSegment(segmentName, currentAssignment,
_instancePartitionsMap);
currentAssignment.put(segmentName,
@@ -222,11 +224,11 @@ public class
OfflineNonReplicaGroupTieredSegmentAssignmentTest {
}
// Bootstrap table should reassign all segments
- Configuration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setBootstrap(true);
Map<String, Map<String, String>> newAssignment =
_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, _sortedTiers,
- _tierInstancePartitionsMap, new BaseConfiguration());
+ _tierInstancePartitionsMap, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
// segments 0-49 remain unchanged
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 4ab378e3a7..9d921d3af8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -24,18 +24,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -227,13 +226,12 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
Map<InstancePartitionsType, InstancePartitions>
noRelocationInstancePartitionsMap =
ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
noRelocationInstancePartitionsMap, null, null,
- new BaseConfiguration()), currentAssignment);
+ new RebalanceConfig()), currentAssignment);
// Rebalance with COMPLETED instance partitions should relocate all
COMPLETED (ONLINE) segments to the COMPLETED
// instances
Map<String, Map<String, String>> newAssignment =
- _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null,
- new BaseConfiguration());
+ _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, new RebalanceConfig());
assertEquals(newAssignment.size(), NUM_SEGMENTS + numUploadedSegments + 1);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
@@ -270,8 +268,8 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest {
assertEquals(totalNumSegmentsAssigned, expectedTotalNumSegmentsAssigned);
// Rebalance with COMPLETED instance partitions including CONSUMING
segments should give the same assignment
- BaseConfiguration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING,
true);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setIncludeConsuming(true);
assertEquals(
_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, rebalanceConfig),
newAssignment);
@@ -279,11 +277,11 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest
{
// Rebalance without COMPLETED instance partitions again should change the
segment assignment back
currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
assertEquals(_segmentAssignment.rebalanceTable(newAssignment,
noRelocationInstancePartitionsMap, null, null,
- new BaseConfiguration()), currentAssignment);
+ new RebalanceConfig()), currentAssignment);
// Bootstrap table without COMPLETED instance partitions should be the
same as regular rebalance
- rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setBootstrap(true);
assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
noRelocationInstancePartitionsMap, null, null,
rebalanceConfig), currentAssignment);
assertEquals(_segmentAssignment.rebalanceTable(newAssignment,
noRelocationInstancePartitionsMap, null, null,
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
index f540350e64..e03a8d4a46 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.tier.PinotServerTierStorage;
@@ -33,13 +32,13 @@ import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -184,8 +183,7 @@ public class
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
// Rebalance without tier instancePartitions moves all instances to
COMPLETED
Map<String, Map<String, String>> newAssignment =
- _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null,
- new BaseConfiguration());
+ _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, new RebalanceConfig());
assertEquals(newAssignment.size(), NUM_SEGMENTS);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { // ONLINE segments
@@ -208,7 +206,7 @@ public class
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
int expectedOnTierB = 20;
int expectedOnCompleted = NUM_SEGMENTS - NUM_PARTITIONS - expectedOnTierA
- expectedOnTierB;
newAssignment = _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, _sortedTiers,
- _tierInstancePartitionsMap, new BaseConfiguration());
+ _tierInstancePartitionsMap, new RebalanceConfig());
assertEquals(newAssignment.size(), NUM_SEGMENTS);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
@@ -259,8 +257,8 @@ public class
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
}
// Rebalance with including CONSUMING should give the same assignment
- BaseConfiguration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING,
true);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setIncludeConsuming(true);
assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, _sortedTiers,
_tierInstancePartitionsMap, rebalanceConfig), newAssignment);
@@ -270,13 +268,13 @@ public class
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
noRelocationInstancePartitionsMap.put(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
assertEquals(_segmentAssignment.rebalanceTable(newAssignment,
noRelocationInstancePartitionsMap, null, null,
- new BaseConfiguration()), currentAssignment);
+ new RebalanceConfig()), currentAssignment);
// Rebalance without COMPLETED instance partitions and with
tierInstancePartitions should move ONLINE segments to
// Tiers and CONSUMING segments to CONSUMING tenant.
newAssignment =
_segmentAssignment.rebalanceTable(currentAssignment,
noRelocationInstancePartitionsMap, _sortedTiers,
- _tierInstancePartitionsMap, new BaseConfiguration());
+ _tierInstancePartitionsMap, new RebalanceConfig());
numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment,
INSTANCES_TIER_A);
@@ -299,8 +297,8 @@ public class
RealtimeNonReplicaGroupTieredSegmentAssignmentTest {
assertEquals(numSegmentsAssignedPerInstance.length,
NUM_CONSUMING_INSTANCES);
// Bootstrap
- rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setBootstrap(true);
newAssignment = _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, _sortedTiers,
_tierInstancePartitionsMap, rebalanceConfig);
int index = 0;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index a6da0552cd..2c590ef25e 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -25,12 +25,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -38,7 +38,6 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -214,13 +213,12 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
Map<InstancePartitionsType, InstancePartitions>
noRelocationInstancePartitionsMap =
ImmutableMap.of(InstancePartitionsType.CONSUMING,
_instancePartitionsMap.get(InstancePartitionsType.CONSUMING));
assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
noRelocationInstancePartitionsMap, null, null,
- new BaseConfiguration()), currentAssignment);
+ new RebalanceConfig()), currentAssignment);
// Rebalance with COMPLETED instance partitions should relocate all
COMPLETED (ONLINE) segments to the COMPLETED
// instances
Map<String, Map<String, String>> newAssignment =
- _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null,
- new BaseConfiguration());
+ _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, new RebalanceConfig());
assertEquals(newAssignment.size(), NUM_SEGMENTS +
uploadedSegmentNames.size() + 1);
for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) {
@@ -255,19 +253,19 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
}
// Rebalance with COMPLETED instance partitions including CONSUMING
segments should give the same assignment
- BaseConfiguration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING,
true);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setIncludeConsuming(true);
assertEquals(
_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, rebalanceConfig),
newAssignment);
// Rebalance without COMPLETED instance partitions again should change the
segment assignment back
assertEquals(_segmentAssignment.rebalanceTable(newAssignment,
noRelocationInstancePartitionsMap, null, null,
- new BaseConfiguration()), currentAssignment);
+ new RebalanceConfig()), currentAssignment);
// Bootstrap table without COMPLETED instance partitions should be the
same as regular rebalance
- rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setBootstrap(true);
assertEquals(_segmentAssignment.rebalanceTable(currentAssignment,
noRelocationInstancePartitionsMap, null, null,
rebalanceConfig), currentAssignment);
assertEquals(_segmentAssignment.rebalanceTable(newAssignment,
noRelocationInstancePartitionsMap, null, null,
@@ -417,8 +415,8 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
ImmutableMap.of(InstancePartitionsType.CONSUMING,
consumingInstancePartitions, InstancePartitionsType.COMPLETED,
completedInstancePartitions);
- BaseConfiguration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.INCLUDE_CONSUMING,
true);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setIncludeConsuming(true);
Map<String, Map<String, String>> newAssignment =
_segmentAssignment.rebalanceTable(uploadedCurrentAssignment,
instancePartitionsMap, null, null,
rebalanceConfig);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java
index 7a75f8350d..f8df6b53eb 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/AllServersSegmentAssignmentStrategyTest.java
@@ -35,6 +35,7 @@ import
org.apache.pinot.controller.helix.core.assignment.segment.OfflineSegmentA
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -124,7 +125,7 @@ public class AllServersSegmentAssignmentStrategyTest {
when(dataAccessor.getChildValues(builder.instanceConfigs(),
true)).thenReturn(instanceConfigList);
Map<String, Map<String, String>> newAssignment =
- _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, null);
+ _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, new RebalanceConfig());
assertEquals(newAssignment.get(SEGMENT_NAME).size(), NUM_INSTANCES - 1);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java
index 277043230f..c62f03fef2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategyTest.java
@@ -24,19 +24,17 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.assignment.InstancePartitions;
import
org.apache.pinot.controller.helix.core.assignment.segment.OfflineSegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -129,8 +127,8 @@ public class BalancedNumSegmentAssignmentStrategyTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
- assertEquals(_segmentAssignment
- .rebalanceTable(currentAssignment, _instancePartitionsMap, null,
null, new BaseConfiguration()),
+ assertEquals(
+ _segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, new RebalanceConfig()),
currentAssignment);
}
@@ -145,8 +143,8 @@ public class BalancedNumSegmentAssignmentStrategyTest {
}
// Bootstrap table should reassign all segments based on their
alphabetical order
- Configuration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setBootstrap(true);
Map<String, Map<String, String>> newAssignment =
_segmentAssignment.rebalanceTable(currentAssignment,
_instancePartitionsMap, null, null, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java
index e944a3ab2f..ea4c88774b 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategyTest.java
@@ -25,8 +25,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -39,6 +37,7 @@ import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentTestUtils;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -46,7 +45,6 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.BeforeClass;
@@ -260,10 +258,9 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
- assertEquals(_segmentAssignmentWithoutPartition
- .rebalanceTable(currentAssignment,
_instancePartitionsMapWithoutPartition, null, null,
- new BaseConfiguration()),
- currentAssignment);
+ assertEquals(
+ _segmentAssignmentWithoutPartition.rebalanceTable(currentAssignment,
_instancePartitionsMapWithoutPartition,
+ null, null, new RebalanceConfig()), currentAssignment);
}
@Test
@@ -288,10 +285,9 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
- assertEquals(_segmentAssignmentWithPartition
- .rebalanceTable(currentAssignment,
_instancePartitionsMapWithPartition, null, null,
- new BaseConfiguration()),
- currentAssignment);
+ assertEquals(
+ _segmentAssignmentWithPartition.rebalanceTable(currentAssignment,
_instancePartitionsMapWithPartition, null,
+ null, new RebalanceConfig()), currentAssignment);
}
@Test
@@ -305,10 +301,11 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
}
// Bootstrap table should reassign all segments based on their
alphabetical order
- Configuration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
- Map<String, Map<String, String>> newAssignment =
_segmentAssignmentWithoutPartition
- .rebalanceTable(currentAssignment,
_instancePartitionsMapWithoutPartition, null, null, rebalanceConfig);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setBootstrap(true);
+ Map<String, Map<String, String>> newAssignment =
+ _segmentAssignmentWithoutPartition.rebalanceTable(currentAssignment,
_instancePartitionsMapWithoutPartition,
+ null, null, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
List<String> sortedSegments = new ArrayList<>(SEGMENTS);
sortedSegments.sort(null);
@@ -328,10 +325,11 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
}
// Bootstrap table should reassign all segments based on their
alphabetical order within the partition
- Configuration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
- Map<String, Map<String, String>> newAssignment =
_segmentAssignmentWithPartition
- .rebalanceTable(currentAssignment,
_instancePartitionsMapWithPartition, null, null, rebalanceConfig);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setBootstrap(true);
+ Map<String, Map<String, String>> newAssignment =
+ _segmentAssignmentWithPartition.rebalanceTable(currentAssignment,
_instancePartitionsMapWithPartition, null,
+ null, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
int numSegmentsPerPartition = NUM_SEGMENTS / NUM_PARTITIONS;
String[][] partitionIdToSegmentsMap = new
String[NUM_PARTITIONS][numSegmentsPerPartition];
@@ -363,9 +361,9 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
SEGMENTS.forEach(segName -> unbalancedAssignment.put(segName, ImmutableMap
.of(instance0, SegmentStateModel.ONLINE, instance1,
SegmentStateModel.ONLINE, instance2,
SegmentStateModel.ONLINE)));
- Map<String, Map<String, String>> balancedAssignment =
_segmentAssignmentWithPartition
- .rebalanceTable(unbalancedAssignment,
_instancePartitionsMapWithoutPartition, null, null,
- new BaseConfiguration());
+ Map<String, Map<String, String>> balancedAssignment =
+ _segmentAssignmentWithPartition.rebalanceTable(unbalancedAssignment,
_instancePartitionsMapWithoutPartition,
+ null, null, new RebalanceConfig());
int[] actualNumSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(balancedAssignment,
INSTANCES);
int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES];
@@ -451,13 +449,13 @@ public class ReplicaGroupSegmentAssignmentStrategyTest {
// Current assignment should already be balanced
assertEquals(
- segmentAssignment.rebalanceTable(currentAssignment,
instancePartitionsMap, null, null, new BaseConfiguration()),
+ segmentAssignment.rebalanceTable(currentAssignment,
instancePartitionsMap, null, null, new RebalanceConfig()),
currentAssignment);
// Test bootstrap
// Bootstrap table should reassign all segments based on their
alphabetical order within the partition
- Configuration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.setProperty(RebalanceConfigConstants.BOOTSTRAP, true);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setBootstrap(true);
Map<String, Map<String, String>> newAssignment =
segmentAssignment.rebalanceTable(currentAssignment,
instancePartitionsMap, null, null, rebalanceConfig);
assertEquals(newAssignment.size(), NUM_SEGMENTS);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 6bf4710578..efe47d6931 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,8 +24,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.tier.TierFactory;
@@ -42,7 +40,6 @@ import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitio
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.AfterClass;
@@ -100,7 +97,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
// Rebalance should fail without creating the table
- RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new BaseConfiguration());
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
// Create the table
@@ -117,7 +114,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
// Rebalance should return NO_OP status
- rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// All servers should be assigned to the table
@@ -140,8 +137,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
// Rebalance in dry-run mode
- Configuration rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, true);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -170,8 +167,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
oldSegmentAssignment);
// Rebalance with 3 min available replicas should fail as the table only
have 3 replicas
- rebalanceConfig = new BaseConfiguration();
-
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
3);
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setMinAvailableReplicas(3);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
@@ -180,8 +177,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
oldSegmentAssignment);
// Rebalance with 2 min available replicas should succeed
- rebalanceConfig = new BaseConfiguration();
-
rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
2);
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setMinAvailableReplicas(2);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -202,7 +199,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.updateTableConfig(tableConfig);
// No need to reassign instances because instances should be automatically
assigned when updating the table config
- rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
// There should be 3 replica-groups, each with 2 servers
@@ -249,15 +246,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
// Without instances reassignment, the rebalance should return status
NO_OP, and the existing instance partitions
// should be used
- rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment);
assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
// With instances reassignment, the rebalance should return status DONE,
the existing instance partitions should be
// removed, and the default instance partitions should be used
- rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES,
true);
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setReassignInstances(true);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
assertNull(InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
@@ -284,8 +281,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
// Rebalance with downtime should succeed
- rebalanceConfig = new BaseConfiguration();
- rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, true);
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDowntime(true);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -350,7 +347,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
- RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new BaseConfiguration());
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// Segment assignment should not change
assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -366,7 +363,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER,
TIER_B_NAME, 3, 3, 0));
// rebalance is NOOP and no change in assignment caused by new instances
- rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// Segment assignment should not change
assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -384,7 +381,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.updateTableConfig(tableConfig);
// rebalance should change assignment
- rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
// check that segments have moved to tiers
@@ -441,7 +438,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
- RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new BaseConfiguration());
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// Segment assignment should not change
assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -453,7 +450,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
_helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER,
"replicaAssignment" + TIER_A_NAME, 6, 6, 0));
// rebalance is NOOP and no change in assignment caused by new instances
- rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// Segment assignment should not change
assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -465,7 +462,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.updateTableConfig(tableConfig);
// rebalance should change assignment
- rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
// check that segments have moved to tier a
@@ -487,7 +484,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
_helixResourceManager.updateTableConfig(tableConfig);
- rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig());
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index 6bb7fbebb1..76189e9268 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -29,13 +29,13 @@ import
org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.AfterClass;
@@ -97,10 +97,10 @@ public class TenantRebalancerTest extends ControllerTest {
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME_B).getRecord().getMapFields();
// rebalance the tables on test tenant
- TenantRebalanceContext context = new TenantRebalanceContext();
- context.setTenantName(TENANT_NAME);
- context.setVerboseResult(true);
- TenantRebalanceResult result = tenantRebalancer.rebalance(context);
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ TenantRebalanceResult result = tenantRebalancer.rebalance(config);
RebalanceResult rebalanceResult =
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_B);
Map<String, Map<String, String>> rebalancedAssignment =
rebalanceResult.getSegmentAssignment();
// assignment should not change, with a NO_OP status as no now server is
added to test tenant
@@ -108,8 +108,8 @@ public class TenantRebalancerTest extends ControllerTest {
assertEquals(oldSegmentAssignment, rebalancedAssignment);
// rebalance the tables on default tenant
- context.setTenantName(DEFAULT_TENANT_NAME);
- result = tenantRebalancer.rebalance(context);
+ config.setTenantName(DEFAULT_TENANT_NAME);
+ result = tenantRebalancer.rebalance(config);
// rebalancing default tenant should distribute the segment of table A
over 6 servers
rebalanceResult =
result.getRebalanceTableResults().get(OFFLINE_TABLE_NAME_A);
InstancePartitions partitions =
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.OFFLINE);
@@ -152,7 +152,8 @@ public class TenantRebalancerTest extends ControllerTest {
if (controllerJobZKMetadata == null) {
return null;
}
- return
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+ return JsonUtils.stringToObject(
+
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
TenantRebalanceProgressStats.class);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java
deleted file mode 100644
index 37cd4f3f25..0000000000
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.utils;
-
-/**
- * Constants for rebalance config properties
- */
-public class RebalanceConfigConstants {
- private RebalanceConfigConstants() {
- }
-
- // Unique Id for rebalance
- public static final String JOB_ID = "jobId";
-
- // Progress of the Rebalance operartion
- public static final String REBALANCE_PROGRESS_STATS =
"REBALANCE_PROGRESS_STATS";
-
- // Whether to rebalance table in dry-run mode
- public static final String DRY_RUN = "dryRun";
- public static final boolean DEFAULT_DRY_RUN = false;
-
- // Whether to reassign instances before reassigning segments
- public static final String REASSIGN_INSTANCES = "reassignInstances";
- public static final boolean DEFAULT_REASSIGN_INSTANCES = false;
-
- // Whether to reassign CONSUMING segments
- public static final String INCLUDE_CONSUMING = "includeConsuming";
- public static final boolean DEFAULT_INCLUDE_CONSUMING = false;
-
- // Whether to rebalance table in bootstrap mode (regardless of minimum
segment movement, reassign all segments in a
- // round-robin fashion as if adding new segments to an empty table)
- public static final String BOOTSTRAP = "bootstrap";
- public static final boolean DEFAULT_BOOTSTRAP = false;
-
- // Whether to allow downtime for the rebalance
- public static final String DOWNTIME = "downtime";
- public static final boolean DEFAULT_DOWNTIME = false;
-
- // For no-downtime rebalance, minimum number of replicas to keep alive
during rebalance, or maximum number of replicas
- // allowed to be unavailable if value is negative
- public static final String MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME =
"minReplicasToKeepUpForNoDowntime";
- public static final int DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME = 1;
-
- // Whether to use best-efforts to rebalance (not fail the rebalance when the
no-downtime contract cannot be achieved)
- // When using best-efforts to rebalance, the following scenarios won't fail
the rebalance (will log warnings instead):
- // - Segment falls into ERROR state in ExternalView -> count ERROR state as
good state
- // - ExternalView has not converged within the maximum wait time -> continue
to the next stage
- public static final String BEST_EFFORTS = "bestEfforts";
- public static final boolean DEFAULT_BEST_EFFORTS = false;
-
- // The check on external view can be very costly when the table has very
large ideal and external states, i.e. when
- // having a huge number of segments. These two configs help reduce the cpu
load on controllers, e.g. by doing the
- // check less frequently and bail out sooner to rebalance at best effort if
configured so.
- public static final String EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS =
"externalViewCheckIntervalInMs";
- public static final long DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS =
1_000L; // 1 second
- public static final String EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS =
"externalViewStabilizationTimeoutInMs";
- public static final long DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS =
60 * 60_000L; // 1 hour
- public static final String UPDATE_TARGET_TIER = "updateTargetTier";
- public static final boolean DEFAULT_UPDATE_TARGET_TIER = false;
-}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 3a57293288..a4c85505dc 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -28,7 +28,6 @@ import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.StringUtil;
@@ -199,29 +198,26 @@ public class ControllerRequestURLBuilder {
}
public String forTableRebalance(String tableName, String tableType) {
- return forTableRebalance(tableName, tableType,
RebalanceConfigConstants.DEFAULT_DRY_RUN,
- RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES,
RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING,
- RebalanceConfigConstants.DEFAULT_DOWNTIME,
-
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
+ return forTableRebalance(tableName, tableType, false, false, false, false,
1);
}
public String forTableRebalance(String tableName, String tableType, boolean
dryRun, boolean reassignInstances,
boolean includeConsuming, boolean downtime, int minAvailableReplicas) {
StringBuilder stringBuilder =
new StringBuilder(StringUtil.join("/", _baseUrl, "tables", tableName,
"rebalance?type=" + tableType));
- if (dryRun != RebalanceConfigConstants.DEFAULT_DRY_RUN) {
+ if (dryRun) {
stringBuilder.append("&dryRun=").append(dryRun);
}
- if (reassignInstances !=
RebalanceConfigConstants.DEFAULT_REASSIGN_INSTANCES) {
+ if (reassignInstances) {
stringBuilder.append("&reassignInstances=").append(reassignInstances);
}
- if (includeConsuming !=
RebalanceConfigConstants.DEFAULT_INCLUDE_CONSUMING) {
+ if (includeConsuming) {
stringBuilder.append("&includeConsuming=").append(includeConsuming);
}
- if (downtime != RebalanceConfigConstants.DEFAULT_DOWNTIME) {
+ if (downtime) {
stringBuilder.append("&downtime=").append(downtime);
}
- if (minAvailableReplicas !=
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME) {
+ if (minAvailableReplicas != 1) {
stringBuilder.append("&minAvailableReplicas=").append(minAvailableReplicas);
}
return stringBuilder.toString();
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index 6d1d58f743..0a68242703 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -19,39 +19,33 @@
package org.apache.pinot.tools;
import com.google.common.base.Preconditions;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
/**
* Helper class for pinot-admin tool's RebalanceTable command.
*/
public class PinotTableRebalancer extends PinotZKChanger {
- private final Configuration _rebalanceConfig = new BaseConfiguration();
+ private final RebalanceConfig _rebalanceConfig = new RebalanceConfig();
public PinotTableRebalancer(String zkAddress, String clusterName, boolean
dryRun, boolean reassignInstances,
boolean includeConsuming, boolean bootstrap, boolean downtime, int
minReplicasToKeepUpForNoDowntime,
boolean bestEffort, long externalViewCheckIntervalInMs, long
externalViewStabilizationTimeoutInMs) {
super(zkAddress, clusterName);
- _rebalanceConfig.addProperty(RebalanceConfigConstants.DRY_RUN, dryRun);
- _rebalanceConfig.addProperty(RebalanceConfigConstants.REASSIGN_INSTANCES,
reassignInstances);
- _rebalanceConfig.addProperty(RebalanceConfigConstants.INCLUDE_CONSUMING,
includeConsuming);
- _rebalanceConfig.addProperty(RebalanceConfigConstants.BOOTSTRAP,
bootstrap);
- _rebalanceConfig.addProperty(RebalanceConfigConstants.DOWNTIME, downtime);
-
_rebalanceConfig.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
- minReplicasToKeepUpForNoDowntime);
- _rebalanceConfig.addProperty(RebalanceConfigConstants.BEST_EFFORTS,
bestEffort);
-
_rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS,
- externalViewCheckIntervalInMs);
-
_rebalanceConfig.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS,
- externalViewStabilizationTimeoutInMs);
- _rebalanceConfig.addProperty(RebalanceConfigConstants.JOB_ID,
- TableRebalancer.createUniqueRebalanceJobIdentifier());
+ _rebalanceConfig.setDryRun(dryRun);
+ _rebalanceConfig.setReassignInstances(reassignInstances);
+ _rebalanceConfig.setIncludeConsuming(includeConsuming);
+ _rebalanceConfig.setBootstrap(bootstrap);
+ _rebalanceConfig.setDowntime(downtime);
+ _rebalanceConfig.setMinAvailableReplicas(minReplicasToKeepUpForNoDowntime);
+ _rebalanceConfig.setBestEfforts(bestEffort);
+
_rebalanceConfig.setExternalViewCheckIntervalInMs(externalViewCheckIntervalInMs);
+
_rebalanceConfig.setExternalViewStabilizationTimeoutInMs(externalViewStabilizationTimeoutInMs);
+
_rebalanceConfig.setJobId(TableRebalancer.createUniqueRebalanceJobIdentifier());
}
public RebalanceResult rebalance(String tableNameWithType) {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
index 258c66cea2..5b4c3260d4 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/RebalanceTableCommand.java
@@ -18,9 +18,9 @@
*/
package org.apache.pinot.tools.admin.command;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.tools.Command;
import org.apache.pinot.tools.PinotTableRebalancer;
import org.slf4j.Logger;
@@ -80,12 +80,12 @@ public class RebalanceTableCommand extends
AbstractBaseAdminCommand implements C
@CommandLine.Option(names = {"-externalViewCheckIntervalInMs"},
description = "How often to check if external view converges with ideal
view")
- private long _externalViewCheckIntervalInMs =
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS;
+ private long _externalViewCheckIntervalInMs =
RebalanceConfig.DEFAULT_EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS;
@CommandLine.Option(names = {"-externalViewStabilizationTimeoutInMs"},
description = "How long to wait till external view converges with ideal
view")
private long _externalViewStabilizationTimeoutInMs =
-
RebalanceConfigConstants.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS;
+ RebalanceConfig.DEFAULT_EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS;
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true,
description = "Print this message")
private boolean _help = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]