This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6dccd39bc4 Check `batchSizePerServer` in Rebalance Config Pre-check
(#15984)
6dccd39bc4 is described below
commit 6dccd39bc46c4d99e17979c95610deabddb735f7
Author: Jhow <[email protected]>
AuthorDate: Tue Jun 3 17:29:36 2025 -0700
Check `batchSizePerServer` in Rebalance Config Pre-check (#15984)
* add batchSizePerServer check into rebalance config precheck
* add test
* lint: style
* use segmentInfo.maxSegmentsAddedToASingleServer instead
* update test according to the last commit
---
.../core/rebalance/DefaultRebalancePreChecker.java | 22 ++++++++++-
.../helix/core/rebalance/RebalancePreChecker.java | 9 ++++-
.../helix/core/rebalance/TableRebalancer.java | 16 ++++----
.../TableRebalancerClusterStatelessTest.java | 44 +++++++++++++++++++++-
4 files changed, 79 insertions(+), 12 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index d53256bc18..a893ada3be 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -59,6 +59,9 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
public static final String REBALANCE_CONFIG_OPTIONS =
"rebalanceConfigOptions";
public static final String REPLICA_GROUPS_INFO = "replicaGroupsInfo";
+ public static final int SEGMENT_ADD_THRESHOLD = 200;
+ public static final int RECOMMENDED_BATCH_SIZE = 200;
+
private static double _diskUtilizationThreshold;
protected PinotHelixResourceManager _pinotHelixResourceManager;
@@ -101,7 +104,8 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
preCheckContext.getTableSubTypeSizeDetails(),
_diskUtilizationThreshold, false));
preCheckResult.put(REBALANCE_CONFIG_OPTIONS,
checkRebalanceConfig(rebalanceConfig, tableConfig,
- preCheckContext.getCurrentAssignment(),
preCheckContext.getTargetAssignment()));
+ preCheckContext.getCurrentAssignment(),
preCheckContext.getTargetAssignment(),
+ preCheckContext.getRebalanceSummaryResult()));
preCheckResult.put(REPLICA_GROUPS_INFO, checkReplicaGroups(tableConfig,
rebalanceConfig));
@@ -335,7 +339,8 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
}
private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig
rebalanceConfig, TableConfig tableConfig,
- Map<String, Map<String, String>> currentAssignment, Map<String,
Map<String, String>> targetAssignment) {
+ Map<String, Map<String, String>> currentAssignment, Map<String,
Map<String, String>> targetAssignment,
+ RebalanceSummaryResult rebalanceSummaryResult) {
List<String> warnings = new ArrayList<>();
boolean pass = true;
if (rebalanceConfig.isBestEfforts()) {
@@ -392,6 +397,19 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
warnings.add("updateTargetTier should be enabled when tier configs are
present");
}
+ // --- Batch size per server recommendation check using summary ---
+ int maxSegmentsToAddOnServer =
rebalanceSummaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer();
+ int batchSizePerServer = rebalanceConfig.getBatchSizePerServer();
+ if (maxSegmentsToAddOnServer > SEGMENT_ADD_THRESHOLD) {
+ if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER
+ || batchSizePerServer > RECOMMENDED_BATCH_SIZE) {
+ pass = false;
+ warnings.add("Number of segments to add to a single server (" +
maxSegmentsToAddOnServer + ") is high (>"
+ + SEGMENT_ADD_THRESHOLD + "). It is recommended to set
batchSizePerServer to " + RECOMMENDED_BATCH_SIZE
+ + " or lower to avoid excessive load on servers.");
+ }
+ }
+
return pass ? RebalancePreCheckerResult.pass("All rebalance parameters
look good")
: RebalancePreCheckerResult.warn(StringUtil.join("\n",
warnings.toArray(String[]::new)));
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
index 83b7302d2f..91e80bb497 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
@@ -38,10 +38,12 @@ public interface RebalancePreChecker {
private final Map<String, Map<String, String>> _targetAssignment;
private final TableSizeReader.TableSubTypeSizeDetails
_tableSubTypeSizeDetails;
private final RebalanceConfig _rebalanceConfig;
+ private final RebalanceSummaryResult _rebalanceSummaryResult;
public PreCheckContext(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig,
Map<String, Map<String, String>> currentAssignment, Map<String,
Map<String, String>> targetAssignment,
- @Nullable TableSizeReader.TableSubTypeSizeDetails
tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig) {
+ @Nullable TableSizeReader.TableSubTypeSizeDetails
tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig,
+ RebalanceSummaryResult rebalanceSummaryResult) {
_rebalanceJobId = rebalanceJobId;
_tableNameWithType = tableNameWithType;
_tableConfig = tableConfig;
@@ -49,6 +51,7 @@ public interface RebalancePreChecker {
_targetAssignment = targetAssignment;
_tableSubTypeSizeDetails = tableSubTypeSizeDetails;
_rebalanceConfig = rebalanceConfig;
+ _rebalanceSummaryResult = rebalanceSummaryResult;
}
public String getRebalanceJobId() {
@@ -78,6 +81,10 @@ public interface RebalancePreChecker {
public RebalanceConfig getRebalanceConfig() {
return _rebalanceConfig;
}
+
+ public RebalanceSummaryResult getRebalanceSummaryResult() {
+ return _rebalanceSummaryResult;
+ }
}
Map<String, RebalancePreCheckerResult> check(PreCheckContext
preCheckContext);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 423c851dcd..3bf80c9f60 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -339,22 +339,24 @@ public class TableRebalancer {
fetchTableSizeDetails(tableNameWithType, tableRebalanceLogger);
Map<String, RebalancePreCheckerResult> preChecksResult = null;
+
+ // Calculate summary here itself so that even if the table is already
balanced, the caller can verify whether that
+ // is expected or not based on the summary results
+ RebalanceSummaryResult summaryResult =
+ calculateDryRunSummary(currentAssignment, targetAssignment,
tableNameWithType, tableSubTypeSizeDetails,
+ tableConfig, tableRebalanceLogger);
+
if (preChecks) {
if (_rebalancePreChecker == null) {
tableRebalanceLogger.warn(
"Pre-checks are enabled but the pre-checker is not set, skipping
pre-checks");
} else {
RebalancePreChecker.PreCheckContext preCheckContext =
- new RebalancePreChecker.PreCheckContext(rebalanceJobId,
tableNameWithType,
- tableConfig, currentAssignment, targetAssignment,
tableSubTypeSizeDetails, rebalanceConfig);
+ new RebalancePreChecker.PreCheckContext(rebalanceJobId,
tableNameWithType, tableConfig, currentAssignment,
+ targetAssignment, tableSubTypeSizeDetails, rebalanceConfig,
summaryResult);
preChecksResult = _rebalancePreChecker.check(preCheckContext);
}
}
- // Calculate summary here itself so that even if the table is already
balanced, the caller can verify whether that
- // is expected or not based on the summary results
- RebalanceSummaryResult summaryResult =
- calculateDryRunSummary(currentAssignment, targetAssignment,
tableNameWithType, tableSubTypeSizeDetails,
- tableConfig, tableRebalanceLogger);
if (segmentAssignmentUnchanged) {
tableRebalanceLogger.info("Table is already balanced");
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index d7e111d83d..a25d178091 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -1053,7 +1053,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(preCheckerResult.getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.WARN);
assertEquals(preCheckerResult.getMessage(),
"Number of replicas (3) is greater than 1, downtime is not
recommended.\nDowntime or minAvailableReplicas=0 "
- + "for pauseless tables may cause data loss during rebalance");
+ + "for pauseless tables may cause data loss during rebalance");
rebalanceConfig.setDowntime(false);
rebalanceConfig.setMinAvailableReplicas(-3);
@@ -1081,9 +1081,49 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(preCheckerResult.getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
assertEquals(preCheckerResult.getMessage(), "All rebalance parameters look
good");
+ // Add more segments
+ int additionalNumSegments =
DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + 1;
+ for (int i = 0; i < additionalNumSegments; i++) {
+ _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + (numSegments + i)), null);
+ }
+
+ // Add one more server instance
+ String instanceId = "preCheckerRebalanceConfig_" +
SERVER_INSTANCE_ID_PREFIX + numServers;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+
+ // change num replicas from 3 to 4
+ newTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(4).build();
+
+ // now the new server (the 4th server) should expect to be added all the
existing segments (including consuming)
+ rebalanceResult = tableRebalancer.rebalance(newTableConfig,
rebalanceConfig, null);
+ int expectedNumSegmentsToAdd =
FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS + additionalNumSegments +
numSegments;
+ assertEquals(rebalanceResult.getRebalanceSummaryResult()
+ .getServerInfo()
+ .getServerSegmentChangeInfo()
+ .get(instanceId)
+ .getSegmentsAdded(), expectedNumSegmentsToAdd);
+
assertEquals(rebalanceResult.getRebalanceSummaryResult().getSegmentInfo().getMaxSegmentsAddedToASingleServer(),
+ expectedNumSegmentsToAdd);
+ preCheckerResult =
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS);
+ assertNotNull(preCheckerResult);
+ assertEquals(preCheckerResult.getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.WARN);
+ assertEquals(preCheckerResult.getMessage(),
+ "Number of segments to add to a single server (" +
expectedNumSegmentsToAdd + ") is high (>"
+ + DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + "). It is
recommended to set batchSizePerServer to "
+ + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE
+ + " or lower to avoid excessive load on servers.");
+
+
rebalanceConfig.setBatchSizePerServer(DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE);
+ rebalanceResult = tableRebalancer.rebalance(newTableConfig,
rebalanceConfig, null);
+ preCheckerResult =
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS);
+ assertNotNull(preCheckerResult);
+ assertEquals(preCheckerResult.getPreCheckStatus(),
RebalancePreCheckerResult.PreCheckStatus.PASS);
+ assertEquals(preCheckerResult.getMessage(), "All rebalance parameters look
good");
+
_helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
- for (int i = 0; i < numServers; i++) {
+ for (int i = 0; i < numServers + 1; i++) {
stopAndDropFakeInstance("preCheckerRebalanceConfig_" +
SERVER_INSTANCE_ID_PREFIX + i);
}
executorService.shutdown();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]