This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dccd39bc4 Check `batchSizePerServer` in Rebalance Config Pre-check 
(#15984)
6dccd39bc4 is described below

commit 6dccd39bc46c4d99e17979c95610deabddb735f7
Author: Jhow <[email protected]>
AuthorDate: Tue Jun 3 17:29:36 2025 -0700

    Check `batchSizePerServer` in Rebalance Config Pre-check (#15984)
    
    * add batchSizePerServer check into rebalance config precheck
    
    * add test
    
    * lint: style
    
    * use segmentInfo.maxSegmentsAddedToASingleServer instead
    
    * update test according to the last commit
---
 .../core/rebalance/DefaultRebalancePreChecker.java | 22 ++++++++++-
 .../helix/core/rebalance/RebalancePreChecker.java  |  9 ++++-
 .../helix/core/rebalance/TableRebalancer.java      | 16 ++++----
 .../TableRebalancerClusterStatelessTest.java       | 44 +++++++++++++++++++++-
 4 files changed, 79 insertions(+), 12 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index d53256bc18..a893ada3be 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -59,6 +59,9 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
   public static final String REBALANCE_CONFIG_OPTIONS = 
"rebalanceConfigOptions";
   public static final String REPLICA_GROUPS_INFO = "replicaGroupsInfo";
 
+  public static final int SEGMENT_ADD_THRESHOLD = 200;
+  public static final int RECOMMENDED_BATCH_SIZE = 200;
+
   private static double _diskUtilizationThreshold;
 
   protected PinotHelixResourceManager _pinotHelixResourceManager;
@@ -101,7 +104,8 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
             preCheckContext.getTableSubTypeSizeDetails(), 
_diskUtilizationThreshold, false));
 
     preCheckResult.put(REBALANCE_CONFIG_OPTIONS, 
checkRebalanceConfig(rebalanceConfig, tableConfig,
-        preCheckContext.getCurrentAssignment(), 
preCheckContext.getTargetAssignment()));
+        preCheckContext.getCurrentAssignment(), 
preCheckContext.getTargetAssignment(),
+        preCheckContext.getRebalanceSummaryResult()));
 
     preCheckResult.put(REPLICA_GROUPS_INFO, checkReplicaGroups(tableConfig, 
rebalanceConfig));
 
@@ -335,7 +339,8 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
   }
 
   private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig 
rebalanceConfig, TableConfig tableConfig,
-      Map<String, Map<String, String>> currentAssignment, Map<String, 
Map<String, String>> targetAssignment) {
+      Map<String, Map<String, String>> currentAssignment, Map<String, 
Map<String, String>> targetAssignment,
+      RebalanceSummaryResult rebalanceSummaryResult) {
     List<String> warnings = new ArrayList<>();
     boolean pass = true;
     if (rebalanceConfig.isBestEfforts()) {
@@ -392,6 +397,19 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
       warnings.add("updateTargetTier should be enabled when tier configs are 
present");
     }
 
+    // --- Batch size per server recommendation check using summary ---
+    int maxSegmentsToAddOnServer = 
rebalanceSummaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer();
+    int batchSizePerServer = rebalanceConfig.getBatchSizePerServer();
+    if (maxSegmentsToAddOnServer > SEGMENT_ADD_THRESHOLD) {
+      if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER
+          || batchSizePerServer > RECOMMENDED_BATCH_SIZE) {
+        pass = false;
+        warnings.add("Number of segments to add to a single server (" + 
maxSegmentsToAddOnServer + ") is high (>"
+            + SEGMENT_ADD_THRESHOLD + "). It is recommended to set 
batchSizePerServer to " + RECOMMENDED_BATCH_SIZE
+            + " or lower to avoid excessive load on servers.");
+      }
+    }
+
     return pass ? RebalancePreCheckerResult.pass("All rebalance parameters 
look good")
         : RebalancePreCheckerResult.warn(StringUtil.join("\n", 
warnings.toArray(String[]::new)));
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
index 83b7302d2f..91e80bb497 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java
@@ -38,10 +38,12 @@ public interface RebalancePreChecker {
     private final Map<String, Map<String, String>> _targetAssignment;
     private final TableSizeReader.TableSubTypeSizeDetails 
_tableSubTypeSizeDetails;
     private final RebalanceConfig _rebalanceConfig;
+    private final RebalanceSummaryResult _rebalanceSummaryResult;
 
     public PreCheckContext(String rebalanceJobId, String tableNameWithType, 
TableConfig tableConfig,
         Map<String, Map<String, String>> currentAssignment, Map<String, 
Map<String, String>> targetAssignment,
-        @Nullable TableSizeReader.TableSubTypeSizeDetails 
tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig) {
+        @Nullable TableSizeReader.TableSubTypeSizeDetails 
tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig,
+        RebalanceSummaryResult rebalanceSummaryResult) {
       _rebalanceJobId = rebalanceJobId;
       _tableNameWithType = tableNameWithType;
       _tableConfig = tableConfig;
@@ -49,6 +51,7 @@ public interface RebalancePreChecker {
       _targetAssignment = targetAssignment;
       _tableSubTypeSizeDetails = tableSubTypeSizeDetails;
       _rebalanceConfig = rebalanceConfig;
+      _rebalanceSummaryResult = rebalanceSummaryResult;
     }
 
     public String getRebalanceJobId() {
@@ -78,6 +81,10 @@ public interface RebalancePreChecker {
     public RebalanceConfig getRebalanceConfig() {
       return _rebalanceConfig;
     }
+
+    public RebalanceSummaryResult getRebalanceSummaryResult() {
+      return _rebalanceSummaryResult;
+    }
   }
 
   Map<String, RebalancePreCheckerResult> check(PreCheckContext 
preCheckContext);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 423c851dcd..3bf80c9f60 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -339,22 +339,24 @@ public class TableRebalancer {
         fetchTableSizeDetails(tableNameWithType, tableRebalanceLogger);
 
     Map<String, RebalancePreCheckerResult> preChecksResult = null;
+
+    // Calculate summary here itself so that even if the table is already 
balanced, the caller can verify whether that
+    // is expected or not based on the summary results
+    RebalanceSummaryResult summaryResult =
+        calculateDryRunSummary(currentAssignment, targetAssignment, 
tableNameWithType, tableSubTypeSizeDetails,
+            tableConfig, tableRebalanceLogger);
+
     if (preChecks) {
       if (_rebalancePreChecker == null) {
         tableRebalanceLogger.warn(
             "Pre-checks are enabled but the pre-checker is not set, skipping 
pre-checks");
       } else {
         RebalancePreChecker.PreCheckContext preCheckContext =
-            new RebalancePreChecker.PreCheckContext(rebalanceJobId, 
tableNameWithType,
-                tableConfig, currentAssignment, targetAssignment, 
tableSubTypeSizeDetails, rebalanceConfig);
+            new RebalancePreChecker.PreCheckContext(rebalanceJobId, 
tableNameWithType, tableConfig, currentAssignment,
+                targetAssignment, tableSubTypeSizeDetails, rebalanceConfig, 
summaryResult);
         preChecksResult = _rebalancePreChecker.check(preCheckContext);
       }
     }
-    // Calculate summary here itself so that even if the table is already 
balanced, the caller can verify whether that
-    // is expected or not based on the summary results
-    RebalanceSummaryResult summaryResult =
-        calculateDryRunSummary(currentAssignment, targetAssignment, 
tableNameWithType, tableSubTypeSizeDetails,
-            tableConfig, tableRebalanceLogger);
 
     if (segmentAssignmentUnchanged) {
       tableRebalanceLogger.info("Table is already balanced");
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index d7e111d83d..a25d178091 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -1053,7 +1053,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertEquals(preCheckerResult.getPreCheckStatus(), 
RebalancePreCheckerResult.PreCheckStatus.WARN);
     assertEquals(preCheckerResult.getMessage(),
         "Number of replicas (3) is greater than 1, downtime is not 
recommended.\nDowntime or minAvailableReplicas=0 "
-        + "for pauseless tables may cause data loss during rebalance");
+            + "for pauseless tables may cause data loss during rebalance");
 
     rebalanceConfig.setDowntime(false);
     rebalanceConfig.setMinAvailableReplicas(-3);
@@ -1081,9 +1081,49 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertEquals(preCheckerResult.getPreCheckStatus(), 
RebalancePreCheckerResult.PreCheckStatus.PASS);
     assertEquals(preCheckerResult.getMessage(), "All rebalance parameters look 
good");
 
+    // Add more segments
+    int additionalNumSegments = 
DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + 1;
+    for (int i = 0; i < additionalNumSegments; i++) {
+      _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+          SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + (numSegments + i)), null);
+    }
+
+    // Add one more server instance
+    String instanceId = "preCheckerRebalanceConfig_" + 
SERVER_INSTANCE_ID_PREFIX + numServers;
+    addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+
+    // change num replicas from 3 to 4
+    newTableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(4).build();
+
+    // now the new server (the 4th server) should expect to be added all the 
existing segments (including consuming)
+    rebalanceResult = tableRebalancer.rebalance(newTableConfig, 
rebalanceConfig, null);
+    int expectedNumSegmentsToAdd = 
FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS + additionalNumSegments + 
numSegments;
+    assertEquals(rebalanceResult.getRebalanceSummaryResult()
+        .getServerInfo()
+        .getServerSegmentChangeInfo()
+        .get(instanceId)
+        .getSegmentsAdded(), expectedNumSegmentsToAdd);
+    
assertEquals(rebalanceResult.getRebalanceSummaryResult().getSegmentInfo().getMaxSegmentsAddedToASingleServer(),
+        expectedNumSegmentsToAdd);
+    preCheckerResult = 
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS);
+    assertNotNull(preCheckerResult);
+    assertEquals(preCheckerResult.getPreCheckStatus(), 
RebalancePreCheckerResult.PreCheckStatus.WARN);
+    assertEquals(preCheckerResult.getMessage(),
+        "Number of segments to add to a single server (" + 
expectedNumSegmentsToAdd + ") is high (>"
+            + DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + "). It is 
recommended to set batchSizePerServer to "
+            + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE
+            + " or lower to avoid excessive load on servers.");
+
+    
rebalanceConfig.setBatchSizePerServer(DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE);
+    rebalanceResult = tableRebalancer.rebalance(newTableConfig, 
rebalanceConfig, null);
+    preCheckerResult = 
rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS);
+    assertNotNull(preCheckerResult);
+    assertEquals(preCheckerResult.getPreCheckStatus(), 
RebalancePreCheckerResult.PreCheckStatus.PASS);
+    assertEquals(preCheckerResult.getMessage(), "All rebalance parameters look 
good");
+
     _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
 
-    for (int i = 0; i < numServers; i++) {
+    for (int i = 0; i < numServers + 1; i++) {
       stopAndDropFakeInstance("preCheckerRebalanceConfig_" + 
SERVER_INSTANCE_ID_PREFIX + i);
     }
     executorService.shutdown();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to