mcvsubbu commented on a change in pull request #4990: Enhance TableRebalancer 
to be able to rebalance table under any condition
URL: https://github.com/apache/incubator-pinot/pull/4990#discussion_r368332729
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 ##########
 @@ -175,67 +166,127 @@ public RebalanceResult rebalance(TableConfig 
tableConfig, Configuration rebalanc
     if (currentAssignment.equals(targetAssignment)) {
       LOGGER.info("Table: {} is already balanced", tableNameWithType);
       if (reassignInstances) {
-        return new RebalanceResult(RebalanceResult.Status.DONE, "Instance 
reassigned, table is already balanced",
-            instancePartitionsMap, targetAssignment);
+        if (dryRun) {
+          return new RebalanceResult(RebalanceResult.Status.DONE,
+              "Instance reassigned in dry-run mode, table is already 
balanced", instancePartitionsMap,
+              targetAssignment);
+        } else {
+          return new RebalanceResult(RebalanceResult.Status.DONE, "Instance 
reassigned, table is already balanced",
+              instancePartitionsMap, targetAssignment);
+        }
       } else {
         return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is 
already balanced", instancePartitionsMap,
             targetAssignment);
       }
     }
 
     if (dryRun) {
-      LOGGER.info("Rebalance table: {} in dry-run mode, returning the target 
assignment", tableNameWithType);
+      LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target 
assignment", tableNameWithType);
       return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode", 
instancePartitionsMap, targetAssignment);
     }
 
-    int minAvailableReplicas;
-    if (rebalanceConfig.getBoolean(RebalanceConfigConstants.DOWNTIME, 
RebalanceConfigConstants.DEFAULT_DOWNTIME)) {
-      minAvailableReplicas = 0;
+    if (!downtime && !currentIdealState.isEnabled()) {
+      LOGGER.warn("Table: {} is disabled, rebalancing it with downtime", 
tableNameWithType);
+      downtime = true;
+    }
+
+    if (downtime) {
       LOGGER.info("Rebalancing table: {} with downtime", tableNameWithType);
-    } else {
-      minAvailableReplicas = 
rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
-          
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
-      int numCurrentReplicas = 
currentAssignment.values().iterator().next().size();
-      int numTargetReplicas = 
targetAssignment.values().iterator().next().size();
-      // Use the smaller one to determine the min available replicas
-      int numReplicas = Math.min(numCurrentReplicas, numTargetReplicas);
-      if (minAvailableReplicas > 0) {
-        if (minAvailableReplicas >= numReplicas) {
-          LOGGER.warn(
-              "Illegal config for min available replicas: {} for table: {}, 
must be less than number of replicas (current: {}, target: {})",
-              minAvailableReplicas, tableNameWithType, numCurrentReplicas, 
numTargetReplicas);
-          return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal 
min available replicas config",
+
+      while (true) {
+        // Reuse current IdealState to update the IdealState in cluster
+        ZNRecord idealStateRecord = currentIdealState.getRecord();
+        idealStateRecord.setMapFields(targetAssignment);
+        currentIdealState.setNumPartitions(targetAssignment.size());
+        
currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size()));
+
+        // Check version and update IdealState
+        try {
+          Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor()
+              .set(idealStatePropertyKey.getPath(), idealStateRecord, 
idealStateRecord.getVersion(),
+                  AccessOption.PERSISTENT), "Failed to update IdealState");
+          LOGGER.info("Finished rebalancing table: {} in {}ms.", 
tableNameWithType,
+              System.currentTimeMillis() - startTimeMs);
+          return new RebalanceResult(RebalanceResult.Status.DONE, "Success", 
instancePartitionsMap, targetAssignment);
+        } catch (ZkBadVersionException e) {
+          LOGGER.info("IdealState version changed for table: {}, 
re-calculating the target assignment",
+              tableNameWithType);
+          try {
+            IdealState idealState = 
_helixDataAccessor.getProperty(idealStatePropertyKey);
+            // IdealState might be null if table got deleted, throwing 
exception to abort the rebalance
+            Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+            currentIdealState = idealState;
+            currentAssignment = currentIdealState.getRecord().getMapFields();
+            targetAssignment =
+                segmentAssignment.rebalanceTable(currentAssignment, 
instancePartitionsMap, rebalanceConfig);
+          } catch (Exception e1) {
+            LOGGER.error("Caught exception while re-calculating the target 
assignment for table: {}", tableNameWithType,
+                e1);
+            return new RebalanceResult(RebalanceResult.Status.FAILED,
+                "Caught exception while re-calculating the target assignment: 
" + e1, instancePartitionsMap,
+                targetAssignment);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while updating IdealState for table: 
{}", tableNameWithType, e);
+          return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught 
exception while updating IdealState: " + e,
               instancePartitionsMap, targetAssignment);
         }
       }
-      // If min available replicas is negative, treat it as max unavailable 
replicas
-      if (minAvailableReplicas < 0) {
-        minAvailableReplicas = Math.max(numReplicas + minAvailableReplicas, 0);
+    }
+
+    // Calculate the min available replicas for no-downtime rebalance
+    int minAvailableReplicas = 
rebalanceConfig.getInt(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME,
+        
RebalanceConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME);
+    int numCurrentReplicas = 
currentAssignment.values().iterator().next().size();
+    int numTargetReplicas = targetAssignment.values().iterator().next().size();
+    // Use the smaller one to determine the min available replicas
+    int numReplicas = Math.min(numCurrentReplicas, numTargetReplicas);
+    if (minAvailableReplicas > 0) {
+      if (minAvailableReplicas >= numReplicas) {
+        LOGGER.warn(
+            "Illegal config for min available replicas: {} for table: {}, must 
be less than number of replicas (current: {}, target: {})",
+            minAvailableReplicas, tableNameWithType, numCurrentReplicas, 
numTargetReplicas);
+        return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min 
available replicas config",
+            instancePartitionsMap, targetAssignment);
       }
-      LOGGER.info("Rebalancing table: {} with min available replicas: {}", 
tableNameWithType, minAvailableReplicas);
     }
+    // If min available replicas is negative, treat it as max unavailable 
replicas
+    if (minAvailableReplicas < 0) {
+      minAvailableReplicas = Math.max(numReplicas + minAvailableReplicas, 0);
+    }
+    LOGGER.info("Rebalancing table: {} with min available replicas: {}", 
tableNameWithType, minAvailableReplicas);
 
     int expectedVersion = currentIdealState.getRecord().getVersion();
     while (true) {
       // Wait for ExternalView to converge before updating the next IdealState
+      IdealState idealState;
       try {
-        IdealState idealState = 
waitForExternalViewToConverge(tableNameWithType);
-        LOGGER.info("ExternalView converged for table: {}", tableNameWithType);
-        if (idealState.getRecord().getVersion() != expectedVersion) {
-          LOGGER.info(
-              "IdealState version changed while waiting for ExternalView to 
converge for table: {}, re-calculating the target assignment",
-              tableNameWithType);
+        idealState = waitForExternalViewToConverge(tableNameWithType);
 
 Review comment:
   Should we be passing the expected version into 
`waitForExternalViewToConverge` so that if the idealstate changes due to some 
other factor, we don't wait for convergence. Instead, we should execute the 
logic below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to