This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2f2ea415407 Replace Helix AutoRebalanceStrategy with deterministic
algorithm (#16135)
2f2ea415407 is described below
commit 2f2ea4154071a0e42b26f62bf8db93c04833fe9f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jun 18 12:52:53 2025 -0600
Replace Helix AutoRebalanceStrategy with deterministic algorithm (#16135)
---
.../assignment/segment/SegmentAssignmentUtils.java | 83 +++--
.../BalancedNumSegmentAssignmentStrategy.java | 11 +-
.../segment/SegmentAssignmentUtilsTest.java | 26 +-
.../TableRebalancerClusterStatelessTest.java | 350 +++++++++------------
4 files changed, 222 insertions(+), 248 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 4e5aec1b60e..027f8defba9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -24,14 +24,12 @@ import it.unimi.dsi.fastutil.ints.IntIntPair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.HelixManager;
-import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
@@ -140,25 +138,72 @@ public class SegmentAssignmentUtils {
return instancesAssigned;
}
- /**
- * Rebalances the table with Helix AutoRebalanceStrategy.
- */
- public static Map<String, Map<String, String>>
rebalanceTableWithHelixAutoRebalanceStrategy(
- Map<String, Map<String, String>> currentAssignment, List<String>
instances, int replication) {
- // Use Helix AutoRebalanceStrategy to rebalance the table
- LinkedHashMap<String, Integer> states = new LinkedHashMap<>();
- states.put(SegmentStateModel.ONLINE, replication);
- AutoRebalanceStrategy autoRebalanceStrategy =
- new AutoRebalanceStrategy(null, new
ArrayList<>(currentAssignment.keySet()), states);
- // Make a copy of the current assignment because this step might change
the passed in assignment
- Map<String, Map<String, String>> currentAssignmentCopy = new TreeMap<>();
+ /// Rebalances the table with non-replica-group based segment assignment
strategy by uniformly spraying segment
+ /// replicas to the servers.
+ /// 1. Calculate the target number of segments on each server
+ /// 2. Loop over all the segments and keep the assignment if target number
of segments for the server has not been
+ /// reached and track the not assigned segments
+ /// 3. Assign the left-over segments to the servers with the least segments,
or the smallest index if there is a tie
+ public static Map<String, Map<String, String>>
rebalanceNonReplicaGroupBasedTable(
+ Map<String, Map<String, String>> currentAssignment, List<String>
servers, int replication) {
+ Map<String, Integer> serverIds = getInstanceNameToIdMap(servers);
+
+ // Calculate target number of segments per server
+ // NOTE: in order to minimize the segment movements, use the ceiling of
the quotient
+ int numServers = servers.size();
+ int numSegments = currentAssignment.size();
+ int targetNumSegmentsPerServer = (numSegments * replication + numServers -
1) / numServers;
+
+ // Do not move segment if target number of segments is not reached, track
the segments need to be moved
+ Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+ int[] numSegmentsAssignedPerServer = new int[numServers];
+ List<String> segmentsNotAssigned = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
- String segmentName = entry.getKey();
- Map<String, String> instanceStateMap = entry.getValue();
- currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap));
+ String segment = entry.getKey();
+ Set<String> currentServers = entry.getValue().keySet();
+ int remainingReplicas = replication;
+ for (String server : currentServers) {
+ Integer serverId = serverIds.get(server);
+ if (serverId != null && numSegmentsAssignedPerServer[serverId] <
targetNumSegmentsPerServer) {
+ newAssignment.computeIfAbsent(segment, k -> new
TreeMap<>()).put(server, SegmentStateModel.ONLINE);
+ numSegmentsAssignedPerServer[serverId]++;
+ remainingReplicas--;
+ if (remainingReplicas == 0) {
+ break;
+ }
+ }
+ }
+ for (int i = 0; i < remainingReplicas; i++) {
+ segmentsNotAssigned.add(segment);
+ }
+ }
+
+ // Assign each not assigned segment to the server with the least segments,
or the smallest id if there is a tie
+ PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numServers,
Pairs.intPairComparator());
+ for (int serverId = 0; serverId < numServers; serverId++) {
+ heap.add(new Pairs.IntPair(numSegmentsAssignedPerServer[serverId],
serverId));
+ }
+ List<Pairs.IntPair> skippedPairs = new ArrayList<>();
+ for (String segment : segmentsNotAssigned) {
+ Map<String, String> instanceStateMap =
newAssignment.computeIfAbsent(segment, k -> new TreeMap<>());
+ while (true) {
+ Pairs.IntPair intPair = heap.remove();
+ int serverId = intPair.getRight();
+ String server = servers.get(serverId);
+ // Skip the server if it already has the segment
+ if (instanceStateMap.put(server, SegmentStateModel.ONLINE) == null) {
+ intPair.setLeft(intPair.getLeft() + 1);
+ heap.add(intPair);
+ break;
+ } else {
+ skippedPairs.add(intPair);
+ }
+ }
+ heap.addAll(skippedPairs);
+ skippedPairs.clear();
}
- return autoRebalanceStrategy.computePartitionAssignment(instances,
instances, currentAssignmentCopy, null)
- .getMapFields();
+
+ return newAssignment;
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
index e9c540da78a..c1270705848 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java
@@ -42,17 +42,15 @@ import org.slf4j.LoggerFactory;
public class BalancedNumSegmentAssignmentStrategy implements
SegmentAssignmentStrategy {
private static final Logger LOGGER =
LoggerFactory.getLogger(BalancedNumSegmentAssignmentStrategy.class);
- private String _tableNameWithType;
private int _replication;
@Override
public void init(HelixManager helixManager, TableConfig tableConfig) {
- _tableNameWithType = tableConfig.getTableName();
SegmentsValidationAndRetentionConfig validationAndRetentionConfig =
tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "Validation
Config is null");
_replication = tableConfig.getReplication();
- LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: "
+ "{} with replication: {}",
- _tableNameWithType, _replication);
+ LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table:
{} with replication: {}",
+ tableConfig.getTableName(), _replication);
}
@Override
@@ -66,12 +64,9 @@ public class BalancedNumSegmentAssignmentStrategy implements
SegmentAssignmentSt
public Map<String, Map<String, String>> reassignSegments(Map<String,
Map<String, String>> currentAssignment,
InstancePartitions instancePartitions, InstancePartitionsType
instancePartitionsType) {
validateSegmentAssignmentStrategy(instancePartitions);
- Map<String, Map<String, String>> newAssignment;
List<String> instances =
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions,
_replication);
- newAssignment =
-
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
instances, _replication);
- return newAssignment;
+ return
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment,
instances, _replication);
}
private void validateSegmentAssignmentStrategy(InstancePartitions
instancePartitions) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index 0f43b7869df..306e391ce0c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -76,8 +76,7 @@ public class SegmentAssignmentUtilsTest {
Arrays.fill(expectedNumSegmentsAssignedPerInstance,
numSegmentsPerInstance);
assertEquals(numSegmentsAssignedPerInstance,
expectedNumSegmentsAssignedPerInstance);
// Current assignment should already be balanced
- assertEquals(
-
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
instances, NUM_REPLICAS),
+
assertEquals(SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment,
instances, NUM_REPLICAS),
currentAssignment);
// Replace instance_0 with instance_10
@@ -85,8 +84,7 @@ public class SegmentAssignmentUtilsTest {
String newInstanceName = INSTANCE_NAME_PREFIX + 10;
newInstances.set(0, newInstanceName);
Map<String, Map<String, String>> newAssignment =
-
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances,
- NUM_REPLICAS);
+
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment,
newInstances, NUM_REPLICAS);
// There should be 100 segments assigned
assertEquals(currentAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -116,8 +114,8 @@ public class SegmentAssignmentUtilsTest {
// }
int newNumInstances = numInstances - 5;
newInstances =
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
- newAssignment =
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances,
- NUM_REPLICAS);
+ newAssignment =
+
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment,
newInstances, NUM_REPLICAS);
// There should be 100 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -127,19 +125,19 @@ public class SegmentAssignmentUtilsTest {
// The segments are not perfectly balanced, but should be deterministic
numSegmentsAssignedPerInstance =
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment,
newInstances);
- assertEquals(numSegmentsAssignedPerInstance[0], 56);
+ assertEquals(numSegmentsAssignedPerInstance[0], 60);
assertEquals(numSegmentsAssignedPerInstance[1], 60);
assertEquals(numSegmentsAssignedPerInstance[2], 60);
assertEquals(numSegmentsAssignedPerInstance[3], 60);
- assertEquals(numSegmentsAssignedPerInstance[4], 64);
+ assertEquals(numSegmentsAssignedPerInstance[4], 60);
numSegmentsToMovePerInstance =
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment);
assertEquals(numSegmentsToMovePerInstance.size(), numInstances);
- assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0),
IntIntPair.of(26, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0),
IntIntPair.of(30, 0));
assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 1),
IntIntPair.of(30, 0));
assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 2),
IntIntPair.of(30, 0));
assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 3),
IntIntPair.of(30, 0));
- assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4),
IntIntPair.of(34, 0));
+ assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4),
IntIntPair.of(30, 0));
for (int i = 5; i < 10; i++) {
assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i),
IntIntPair.of(0, 30));
}
@@ -150,8 +148,8 @@ public class SegmentAssignmentUtilsTest {
// }
newNumInstances = numInstances + 5;
newInstances =
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
- newAssignment =
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances,
- NUM_REPLICAS);
+ newAssignment =
+
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment,
newInstances, NUM_REPLICAS);
// There should be 100 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
@@ -182,8 +180,8 @@ public class SegmentAssignmentUtilsTest {
// }
String newInstanceNamePrefix = "i_";
newInstances =
SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances);
- newAssignment =
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment,
newInstances,
- NUM_REPLICAS);
+ newAssignment =
+
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment,
newInstances, NUM_REPLICAS);
// There should be 100 segments assigned
assertEquals(newAssignment.size(), numSegments);
// Each segment should have 3 replicas
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 64b8a6e5f5d..f4964021ea9 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -99,7 +99,6 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
}
- ///
/// Dropping instance from cluster requires waiting for live instance gone
and removing instance related ZNodes, which
/// are not the purpose of the test, so combine different rebalance
scenarios into one test:
/// 1. NO_OP rebalance
@@ -107,7 +106,6 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
/// 3. Migrate to replica-group based segment assignment and rebalance
/// 4. Migrate back to non-replica-group based segment assignment and
rebalance
/// 5. Remove (disable) servers and rebalance
- ///
@Test
public void testRebalance()
throws Exception {
@@ -119,16 +117,14 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
for (int i = 0; i < numServers; i++) {
String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
- DiskUsageInfo diskUsageInfo1 =
- new DiskUsageInfo(instanceId, "", 1000L, 500L,
System.currentTimeMillis());
- diskUsageInfoMap.put(instanceId, diskUsageInfo1);
+ DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L,
500L, System.currentTimeMillis());
+ diskUsageInfoMap.put(instanceId, diskUsageInfo);
}
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 1);
- TableRebalancer tableRebalancer =
- new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader);
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager,
null, null, preChecker, _tableSizeReader);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
@@ -175,8 +171,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
assertNotNull(rebalanceSummaryResult.getTagsInfo());
assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
- assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
- TagNameUtils.getOfflineTagForTenant(null));
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant(null));
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers);
@@ -211,8 +206,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
for (int i = 0; i < numServersToAdd; i++) {
String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
- DiskUsageInfo diskUsageInfo =
- new DiskUsageInfo(instanceId, "", 1000L, 500L,
System.currentTimeMillis());
+ DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L,
500L, System.currentTimeMillis());
diskUsageInfoMap.put(instanceId, diskUsageInfo);
}
@@ -227,18 +221,17 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertNotNull(rebalanceSummaryResult);
assertNotNull(rebalanceSummaryResult.getServerInfo());
assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
14);
-
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
14);
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
15);
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
15);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
3);
assertNotNull(rebalanceSummaryResult.getTagsInfo());
assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
- assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
- TagNameUtils.getOfflineTagForTenant(null));
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
14);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant(null));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
15);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
- numSegments * NUM_REPLICAS - 14);
+ numSegments * NUM_REPLICAS - 15);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers + numServersToAdd);
assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -251,19 +244,19 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
// Original servers should be losing some segments
String newServer = SERVER_INSTANCE_ID_PREFIX + i;
RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange =
serverSegmentChangeInfoMap.get(newServer);
- assertTrue(serverSegmentChange.getSegmentsDeleted() > 0);
- assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
- assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
- assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0);
+ assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(),
10);
+ assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
assertEquals(serverSegmentChange.getSegmentsAdded(), 0);
+ assertEquals(serverSegmentChange.getSegmentsDeleted(), 5);
+ assertEquals(serverSegmentChange.getSegmentsUnchanged(), 5);
}
for (int i = 0; i < numServersToAdd; i++) {
// New servers should only get new segments
String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange =
serverSegmentChangeInfoMap.get(newServer);
- assertTrue(serverSegmentChange.getSegmentsAdded() > 0);
assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0);
- assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(),
serverSegmentChange.getSegmentsAdded());
+ assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
+ assertEquals(serverSegmentChange.getSegmentsAdded(), 5);
assertEquals(serverSegmentChange.getSegmentsDeleted(), 0);
assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0);
}
@@ -301,16 +294,14 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
"Instance assignment not allowed, no need for minimizeDataMovement");
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
- assertTrue(
-
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
- .getMessage()
- .startsWith("Within threshold"));
+
assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
+ .getMessage()
+ .startsWith("Within threshold"));
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
- assertTrue(
-
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
- .getMessage()
- .startsWith("Within threshold"));
+
assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
+ .getMessage()
+ .startsWith("Within threshold"));
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS).getMessage(),
@@ -336,17 +327,17 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
Map<String, IntIntPair> instanceToNumSegmentsToMoveMap =
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(oldSegmentAssignment,
newSegmentAssignment);
assertEquals(instanceToNumSegmentsToMoveMap.size(), numServers +
numServersToAdd);
- for (int i = 0; i < numServersToAdd; i++) {
- IntIntPair numSegmentsToMove =
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers +
i));
- assertNotNull(numSegmentsToMove);
- assertTrue(numSegmentsToMove.leftInt() > 0);
- assertEquals(numSegmentsToMove.rightInt(), 0);
- }
for (int i = 0; i < numServers; i++) {
IntIntPair numSegmentsToMove =
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + i);
assertNotNull(numSegmentsToMove);
assertEquals(numSegmentsToMove.leftInt(), 0);
- assertTrue(numSegmentsToMove.rightInt() > 0);
+ assertEquals(numSegmentsToMove.rightInt(), 5);
+ }
+ for (int i = 0; i < numServersToAdd; i++) {
+ IntIntPair numSegmentsToMove =
instanceToNumSegmentsToMoveMap.get(SERVER_INSTANCE_ID_PREFIX + (numServers +
i));
+ assertNotNull(numSegmentsToMove);
+ assertEquals(numSegmentsToMove.leftInt(), 5);
+ assertEquals(numSegmentsToMove.rightInt(), 0);
}
// Dry-run mode should not change the IdealState
@@ -416,25 +407,25 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.WARN);
-
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
- .getMessage(), "reassignInstances is disabled, replica groups may
not be updated.\nOFFLINE segments "
- + "- numReplicaGroups: " + NUM_REPLICAS + ",
numInstancesPerReplicaGroup: 0 (using as many instances as "
- + "possible)");
+ assertEquals(
+
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
+ "reassignInstances is disabled, replica groups may not be
updated.\nOFFLINE segments "
+ + "- numReplicaGroups: " + NUM_REPLICAS + ",
numInstancesPerReplicaGroup: 0 (using as many instances as "
+ + "possible)");
rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
assertNotNull(rebalanceSummaryResult);
assertNotNull(rebalanceSummaryResult.getServerInfo());
assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
11);
-
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
11);
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
20);
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
20);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
assertNotNull(rebalanceSummaryResult.getTagsInfo());
assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
- assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
- TagNameUtils.getOfflineTagForTenant(null));
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
11);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant(null));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
20);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
- numSegments * NUM_REPLICAS - 11);
+ numSegments * NUM_REPLICAS - 20);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers + numServersToAdd);
assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -445,11 +436,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
for (int i = 0; i < numServers + numServersToAdd; i++) {
String newServer = SERVER_INSTANCE_ID_PREFIX + i;
RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange =
serverSegmentChangeInfoMap.get(newServer);
+ assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 5);
assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
- // Ensure not all segments moved
- assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
- // Ensure all segments has something assigned prior to rebalance
- assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
}
// Dry-run mode should not change the IdealState
@@ -515,8 +503,9 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
-
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
- .getMessage(), "OFFLINE segments - Replica Groups are not enabled,
replication: " + NUM_REPLICAS);
+ assertEquals(
+
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
+ "OFFLINE segments - Replica Groups are not enabled, replication: " +
NUM_REPLICAS);
rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
assertNotNull(rebalanceSummaryResult);
assertNotNull(rebalanceSummaryResult.getServerInfo());
@@ -528,11 +517,9 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
assertNotNull(rebalanceSummaryResult.getTagsInfo());
assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
- assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
- TagNameUtils.getOfflineTagForTenant(null));
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant(null));
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
- numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers + numServersToAdd);
assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -557,8 +544,9 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
-
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO)
- .getMessage(), "OFFLINE segments - Replica Groups are not enabled,
replication: " + NUM_REPLICAS);
+ assertEquals(
+
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getMessage(),
+ "OFFLINE segments - Replica Groups are not enabled, replication: " +
NUM_REPLICAS);
rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
assertNotNull(rebalanceSummaryResult);
assertNotNull(rebalanceSummaryResult.getServerInfo());
@@ -571,11 +559,9 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
assertNotNull(rebalanceSummaryResult.getTagsInfo());
assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
- assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
- TagNameUtils.getOfflineTagForTenant(null));
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant(null));
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
- numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers + numServersToAdd);
assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -628,13 +614,11 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
3);
assertNotNull(rebalanceSummaryResult.getTagsInfo());
assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
- assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
- TagNameUtils.getOfflineTagForTenant(null));
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant(null));
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
15);
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS - 15);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
- numServers);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -701,10 +685,11 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
preChecker.init(_helixResourceManager, executorService, 1);
TableRebalancer tableRebalancer = new TableRebalancer(_helixManager,
null, null, preChecker, _tableSizeReader);
// Set up the table with 1 replication factor and strict replica group
enabled
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
- false)).build();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(1)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
// Create the table
addDummySchema(RAW_TABLE_NAME);
@@ -783,20 +768,17 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1,
false, null);
- InstanceAssignmentConfig instanceAssignmentConfig =
- new InstanceAssignmentConfig(
- new
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0,
null), null,
- replicaGroupPartitionConfig,
-
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
true);
- TableConfig tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
- .setNumReplicas(numReplicas)
- .setRoutingConfig(
- new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
- .setStreamConfigs(
-
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap())
-
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
- .build();
+ InstanceAssignmentConfig instanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null),
false, 0, null), null,
+ replicaGroupPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
true);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap())
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
// Create the table
addDummySchema(RAW_TABLE_NAME);
@@ -983,10 +965,11 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
preChecker.init(_helixResourceManager, executorService, 1);
TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker, _tableSizeReader);
// Set up the table with 1 replication factor and strict replica group
enabled
- TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
- false)).build();
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(1)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .build();
// Create the table
addDummySchema(RAW_TABLE_NAME);
@@ -1031,16 +1014,14 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
for (int i = 0; i < numServers; i++) {
String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX +
i;
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
- DiskUsageInfo diskUsageInfo1 =
- new DiskUsageInfo(instanceId, "", 1000L, 200L,
System.currentTimeMillis());
+ DiskUsageInfo diskUsageInfo1 = new DiskUsageInfo(instanceId, "", 1000L,
200L, System.currentTimeMillis());
diskUsageInfoMap.put(instanceId, diskUsageInfo1);
}
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 0.5);
- TableRebalancer tableRebalancer =
- new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader);
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker, _tableSizeReader);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
@@ -1062,8 +1043,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
for (int i = 0; i < numServersToAdd; i++) {
String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX +
(numServers + i);
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
- DiskUsageInfo diskUsageInfo =
- new DiskUsageInfo(instanceId, "", 1000L, 200L,
System.currentTimeMillis());
+ DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L,
200L, System.currentTimeMillis());
diskUsageInfoMap.put(instanceId, diskUsageInfo);
}
@@ -1081,22 +1061,19 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE));
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
- assertTrue(
-
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
- .getMessage()
- .startsWith("Within threshold"));
+
assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_DURING_REBALANCE)
+ .getMessage()
+ .startsWith("Within threshold"));
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE));
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
- assertTrue(
-
preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
- .getMessage()
- .startsWith("Within threshold"));
+
assertTrue(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION_AFTER_REBALANCE)
+ .getMessage()
+ .startsWith("Within threshold"));
for (int i = 0; i < numServers + numServersToAdd; i++) {
String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX +
i;
- DiskUsageInfo diskUsageInfo =
- new DiskUsageInfo(instanceId, "", 1000L, 755L,
System.currentTimeMillis());
+ DiskUsageInfo diskUsageInfo = new DiskUsageInfo(instanceId, "", 1000L,
755L, System.currentTimeMillis());
diskUsageInfoMap.put(instanceId, diskUsageInfo);
}
@@ -1139,13 +1116,11 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 0.5);
- TableRebalancer tableRebalancer =
- new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader);
- TableConfig tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
- .setNumReplicas(2)
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
- .build();
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker, _tableSizeReader);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(2)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ .build();
// Create the table
addDummySchema(RAW_TABLE_NAME);
@@ -1187,13 +1162,12 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
rebalanceConfig.setUpdateTargetTier(false);
rebalanceConfig.setBootstrap(false);
rebalanceConfig.setBestEfforts(false);
- tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
- .setTierConfigList(Collections.singletonList(
- new TierConfig("dummyTier",
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
- TierFactory.PINOT_SERVER_STORAGE_TYPE,
-
TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME), null,
null)))
- .build();
+ tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setTierConfigList(Collections.singletonList(
+ new TierConfig("dummyTier",
TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "7d", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE,
+
TagNameUtils.getRealtimeTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME), null,
null)))
+ .build();
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
preCheckerResult =
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS);
@@ -1218,8 +1192,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
"Number of replicas (3) is greater than 1, downtime is not
recommended.");
// no downtime warning with 1 replica
- newTableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build();
+ newTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(1).build();
rebalanceConfig.setDowntime(true);
rebalanceResult = tableRebalancer.rebalance(newTableConfig,
rebalanceConfig, null);
@@ -1312,8 +1285,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(preCheckerResult.getMessage(),
"Number of segments to add to a single server (" +
expectedNumSegmentsToAdd + ") is high (>"
+ DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + "). It is
recommended to set batchSizePerServer to "
- + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE
- + " or lower to avoid excessive load on servers.");
+ + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE + " or lower
to avoid excessive load on servers.");
rebalanceConfig.setBatchSizePerServer(DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE);
rebalanceResult = tableRebalancer.rebalance(newTableConfig,
rebalanceConfig, null);
@@ -1330,13 +1302,11 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
executorService.shutdown();
}
- /**
- * Tests rebalance with tier configs
- * Add 10 segments, with segment metadata end time 3 days apart starting
from now to 30 days ago
- * 1. run rebalance - should see no change
- * 2. add nodes for tiers and run rebalance - should see no change
- * 3. add tier config and run rebalance - should see changed assignment
- */
+ /// Tests rebalance with tier configs
+ /// Add 10 segments, with segment metadata end time 3 days apart starting
from now to 30 days ago
+ /// 1. run rebalance - should see no change
+ /// 2. add nodes for tiers and run rebalance - should see no change
+ /// 3. add tier config and run rebalance - should see changed assignment
@Test
public void testRebalanceWithTiers()
throws Exception {
@@ -1387,10 +1357,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME));
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
- numSegments * NUM_REPLICAS);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
- numServers);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1428,10 +1396,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME));
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
- numSegments * NUM_REPLICAS);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
- numServers);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1554,8 +1520,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
ExecutorService executorService = Executors.newFixedThreadPool(10);
preChecker.init(_helixResourceManager, executorService, 1);
- TableRebalancer tableRebalancer =
- new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader);
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker, _tableSizeReader);
// Try dry-run summary mode
RebalanceConfig rebalanceConfig = new RebalanceConfig();
@@ -1575,10 +1540,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant("replicaAssignment" +
NO_TIER_NAME));
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
- numSegments * NUM_REPLICAS);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
- numServers);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1612,10 +1575,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
TagNameUtils.getOfflineTagForTenant("replicaAssignment" +
NO_TIER_NAME));
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
- numSegments * NUM_REPLICAS);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
- numServers);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1662,14 +1623,11 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
.getNumSegmentsToDownload(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
- .getNumSegmentsUnchanged(),
- 0);
+ .getNumSegmentsUnchanged(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
- .getNumServerParticipants(),
- 0);
+ .getNumServerParticipants(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
- .getNumSegmentsToDownload(),
- numSegments * NUM_REPLICAS);
+ .getNumSegmentsToDownload(), numSegments * NUM_REPLICAS);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
.getNumSegmentsUnchanged(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
@@ -1709,7 +1667,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
rebalanceConfig.setPreChecks(true);
rebalanceConfig.setReassignInstances(true);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
- assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
assertEquals(
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REPLICA_GROUPS_INFO).getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
@@ -1721,7 +1679,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertNotNull(rebalanceSummaryResult);
assertNotNull(rebalanceSummaryResult.getServerInfo());
assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
13);
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
assertNotNull(rebalanceSummaryResult.getTagsInfo());
@@ -1734,16 +1692,13 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
.getNumSegmentsToDownload(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
- .getNumSegmentsUnchanged(),
- 0);
+ .getNumSegmentsUnchanged(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
- .getNumServerParticipants(),
- 0);
+ .getNumServerParticipants(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
- .getNumSegmentsToDownload(),
- 13);
+ .getNumSegmentsToDownload(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
- .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS - 13);
+ .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
.getNumServerParticipants(), 6);
assertNotNull(rebalanceResult.getInstanceAssignment());
@@ -1751,7 +1706,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertNotNull(rebalanceResult.getSegmentAssignment());
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
- assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
InstancePartitions instancePartitions =
rebalanceResult.getTierInstanceAssignment().get(TIER_A_NAME);
@@ -1823,26 +1778,20 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
.getNumSegmentsToDownload(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
- .getNumSegmentsUnchanged(),
- 0);
+ .getNumSegmentsUnchanged(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
- .getNumServerParticipants(),
- 0);
+ .getNumServerParticipants(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
- .getNumSegmentsToDownload(),
- 0);
+ .getNumSegmentsToDownload(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
.getNumSegmentsUnchanged(), 0);
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
.getNumServerParticipants(), 0);
- assertEquals(
-
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(),
+
assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(),
0);
- assertEquals(
-
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(),
+
assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
- assertEquals(
-
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(),
+
assertEquals(tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(),
6);
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
@@ -1852,7 +1801,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
@Test
public void testRebalanceWithMinimizeDataMovementBalanced()
throws Exception {
- int numServers = 6;
+ int numServers = 3;
for (int i = 0; i < numServers; i++) {
addFakeServerInstanceToAutoJoinHelixCluster("minimizeDataMovement_balance_" +
SERVER_INSTANCE_ID_PREFIX + i,
true);
@@ -2111,13 +2060,11 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
}
ConsumingSegmentInfoReader mockConsumingSegmentInfoReader =
Mockito.mock(ConsumingSegmentInfoReader.class);
- TableRebalancer tableRebalancerOriginal =
- new TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
- TableConfig tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
- .setNumReplicas(numReplica)
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
- .build();
+ TableRebalancer tableRebalancerOriginal = new
TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplica)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ .build();
// Create the table
addDummySchema(RAW_TABLE_NAME);
@@ -2149,11 +2096,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
0);
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(),
0);
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
0);
- assertEquals(consumingSegmentToBeMovedSummary
- .getServerConsumingSegmentSummary()
- .size(), 0);
- assertTrue(consumingSegmentToBeMovedSummary
- .getServerConsumingSegmentSummary()
+
assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
0);
+
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
.values()
.stream()
.allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
== 0));
@@ -2168,11 +2112,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
0);
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(),
0);
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
0);
- assertEquals(consumingSegmentToBeMovedSummary
- .getServerConsumingSegmentSummary()
- .size(), 0);
- assertTrue(consumingSegmentToBeMovedSummary
- .getServerConsumingSegmentSummary()
+
assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
0);
+
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
.values()
.stream()
.allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
== 0));
@@ -2203,11 +2144,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS);
- assertEquals(consumingSegmentToBeMovedSummary
- .getServerConsumingSegmentSummary()
- .size(), numServers);
- assertTrue(consumingSegmentToBeMovedSummary
- .getServerConsumingSegmentSummary()
+
assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
numServers);
+
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
.values()
.stream()
.allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
@@ -2231,13 +2169,11 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
}
- TableRebalancer tableRebalancerOriginal =
- new TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
- TableConfig tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
- .setNumReplicas(numReplica)
-
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
- .build();
+ TableRebalancer tableRebalancerOriginal = new
TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplica)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ .build();
// Create the table
addDummySchema(RAW_TABLE_NAME);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]