siddharthteotia commented on a change in pull request #4446: Add support in the 
rebalancer for the user to provide minimum number of serving replicas
URL: https://github.com/apache/incubator-pinot/pull/4446#discussion_r309762156
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/TableRebalancer.java
 ##########
 @@ -139,112 +171,293 @@ public RebalanceResult rebalance(TableConfig 
tableConfig, RebalanceSegmentStrate
       }
 
       if (EqualityUtils.isEqual(targetIdealState, currentIdealState)) {
-        LOGGER.info("Table {} is rebalanced.", tableNameWithType);
-
         LOGGER.info("Finished rebalancing table {} in {} ms.", 
tableNameWithType,
             TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
         
result.setIdealStateMapping(targetIdealState.getRecord().getMapFields());
         result.setPartitionAssignment(targetPartitionAssignment);
+        result.setStatus("Successfully finished rebalancing");
+        result.setStatusCode(RebalanceResult.RebalanceStatus.DONE);
         return result;
       }
 
       // if ideal state needs to change, get the next 'safe' state (based on 
whether downtime is OK or not)
-      IdealState nextIdealState = getNextState(currentIdealState, 
targetIdealState, rebalanceConfig);
+      final int currentMovements = _rebalancerStats.numSegmentMoves;
+      IdealState nextIdealState = getNextState(currentIdealState, 
targetIdealState, downtime, minAvailableReplicas);
+      LOGGER.info("Got new ideal state after doing {} segment movements. Will 
attempt to persist this in ZK. Logging the difference (if any) between new and 
target ideal state",
+          _rebalancerStats.numSegmentMoves - currentMovements);
+      printIdealStateDifference(targetIdealState, nextIdealState);
 
       // If the ideal state is large enough, enable compression
       if (HelixHelper.MAX_PARTITION_COUNT_IN_UNCOMPRESSED_IDEAL_STATE < 
nextIdealState.getPartitionSet().size()) {
         nextIdealState.getRecord().setBooleanField("enableCompression", true);
       }
 
-      // Check version and set ideal state
+      // Check version and set ideal state to nextIdealState
       try {
-        LOGGER.info("Updating IdealState for table {}", tableNameWithType);
+        LOGGER.info("Going to update current IdealState in ZK for table {}, 
current version {} and creation time {}",
+            tableNameWithType, currentIdealState.getRecord().getVersion(), 
currentIdealState.getRecord().getCreationTime());
         if (zkBaseDataAccessor
             .set(idealStateKey.getPath(), nextIdealState.getRecord(), 
currentIdealState.getRecord().getVersion(),
                 AccessOption.PERSISTENT)) {
+          LOGGER.info("Successfully persisted the ideal state for table {} in 
ZK. Will wait for External view to converge",
+              tableNameWithType);
+          ++_rebalancerStats.updatestoIdealStateInZK;
           // if we succeeded, wait for the change to stabilize
           waitForStable(tableNameWithType);
           // clear retries as it tracks failures with each idealstate update 
attempt
           retries = 0;
+          LOGGER.info("External view converged for the change in ideal state 
for table {}. Will start the next iteration (if any)",
+              tableNameWithType);
           continue;
         }
-        // in case of any error, we retry a bounded number of types
+        // in case of any error, we retry a bounded number of times
       } catch (ZkBadVersionException e) {
-        LOGGER.warn("Version changed while updating ideal state for resource: 
{}", tableNameWithType);
+        // we will go back in the loop and reattempt by recomputing the target 
ideal state
+        LOGGER.info("Version changed while updating ideal state for resource: 
{}, was expecting version {}",
+            tableNameWithType, currentIdealState.getRecord().getVersion());
       } catch (Exception e) {
-        LOGGER.warn("Caught exception while updating ideal state for resource: 
{}", tableNameWithType, e);
+        if (e instanceof IllegalStateException && e.getCause() instanceof 
ExternalViewErrored) {
+          if (e.getCause() instanceof ExternalViewErrored) {
+            LOGGER.error("External view reported error for table {} after 
updating ideal state", tableNameWithType);
+          } else if (e.getCause() instanceof ExternalViewConvergeTimeout) {
+            LOGGER.error("Timed out while waiting for external view to 
converge for table {}", tableNameWithType);
+          }
+          // remove the cause as it is private and is only used to detect 
exact error within this class
+          throw new IllegalStateException(e.getMessage());
+        } else {
+          LOGGER.error("Caught exception while or after updating ideal state 
for resource: {}", tableNameWithType, e);
+          throw e;
+        }
       }
 
+      // if we are here, we failed to persist the updated ideal state in ZK
+      // due to version mismatch, will attempt again
       previousIdealState = currentIdealState;
       if (retries++ > MAX_RETRIES) {
         LOGGER.error("Unable to rebalance table {} in {} attempts. Giving up", 
tableNameWithType, MAX_RETRIES);
+        result.setStatus("Cancelling the rebalance operation as we have 
exhausted the max number of retries after ZK version mismatch");
+        result.setStatusCode(RebalanceResult.RebalanceStatus.FAILED);
         return result;
       }
       // wait before retrying
       try {
         Thread.sleep(retryDelayMs);
       } catch (InterruptedException e) {
         LOGGER.error("Got interrupted while rebalancing table {}", 
tableNameWithType);
+        result.setStatus("Rebalancer got interrupted");
+        result.setStatusCode(RebalanceResult.RebalanceStatus.FAILED);
         Thread.currentThread().interrupt();
         return result;
       }
     }
   }
 
+  private void validateMinReplicas(final String tableNameWithType,
+      final Configuration rebalanceConfig, final IdealState idealState) {
+    final int replicas = Integer.valueOf(idealState.getReplicas());
+    if (rebalanceConfig.getBoolean(RebalanceUserConfigConstants.DOWNTIME,
+        RebalanceUserConfigConstants.DEFAULT_DOWNTIME)) {
+      final int minReplicasToKeepUp = rebalanceConfig
+          
.getInt(RebalanceUserConfigConstants.MIN_REPLICAS_TO_KEEPUP_FOR_NODOWNTIME,
+              
RebalanceUserConfigConstants.DEFAULT_MIN_REPLICAS_TO_KEEPUP_FOR_NODOWNTIME);
+      if (minReplicasToKeepUp >= replicas) {
+        LOGGER.error("User specified invalid number of min replicas: {} to 
keep alive in no-downtime mode for table {}. Replica count for table {} ",
+            minReplicasToKeepUp, tableNameWithType, replicas);
+        throw new IllegalArgumentException("User specified invalid number of 
min available replicas " + minReplicasToKeepUp);
+      }
+    }
+  }
+
+  /**
+   * Print the difference between target and updated ideal state
+   * @param target target ideal state
+   * @param updated updated ideal state
+   */
+  private void printIdealStateDifference(final IdealState target, final 
IdealState updated) {
+    final Map<String, Map<String, String>> targetSegments = 
target.getRecord().getMapFields();
+    final Map<String, Map<String, String>> updatedSegments = 
updated.getRecord().getMapFields();
+    for (String segment : targetSegments.keySet()) {
+      final Map<String, String> targetInstanceMap = 
targetSegments.get(segment);
+      final Map<String, String> updatedInstanceMap = 
updatedSegments.get(segment);
+      if (updatedInstanceMap == null) {
+        LOGGER.info("Segment {} missing from current ideal state", segment);
+      } else {
+        MapDifference diff = Maps.difference(targetInstanceMap, 
updatedInstanceMap);
+        if (diff.entriesInCommon().size() > 0) {
+          LOGGER.debug("Common hosts for segment {}", segment);
+          prettyPrintMap(diff.entriesInCommon(), Level.DEBUG);
+        }
+        if (diff.entriesOnlyOnLeft().size() > 0) {
+          LOGGER.info("Hosts from target state not yet added for segment {}", 
segment);
+          prettyPrintMap(diff.entriesOnlyOnLeft(), Level.INFO);
+        }
+        if (diff.entriesOnlyOnRight().size() > 0) {
+          LOGGER.debug("Hosts from updated state not yet removed for segment 
{}", segment);
+          prettyPrintMap(diff.entriesOnlyOnRight(), Level.DEBUG);
+        }
+      }
+    }
+  }
+
   /**
    * Gets the next ideal state based on the target (rebalanced) state. If no 
downtime is desired, the next state
    * is set such that there is always atleast one common replica for each 
segment between current and next state.
    */
-  private IdealState getNextState(IdealState currentState, IdealState 
targetState, Configuration rebalanceUserConfig) {
-
+  private IdealState getNextState(IdealState currentState, IdealState 
targetState,
+      boolean downtime, int minAvailableReplicas) {
     // make a copy of the ideal state so it can be updated
-    IdealState idealStateCopy = HelixHelper.cloneIdealState(currentState);
+    IdealState idealStateToUpdate = HelixHelper.cloneIdealState(currentState);
 
     Map<String, Map<String, String>> currentMapFields = 
currentState.getRecord().getMapFields();
     Map<String, Map<String, String>> targetMapFields = 
targetState.getRecord().getMapFields();
 
+    final int directUpdates = 
_rebalancerStats.directUpdatesToSegmentInstanceMap;
+    final int incrementalUpdates = 
_rebalancerStats.incrementalUpdatesToSegmentInstanceMap;
+
     for (String segmentId : targetMapFields.keySet()) {
-      updateSegmentIfNeeded(segmentId, currentMapFields.get(segmentId), 
targetMapFields.get(segmentId), idealStateCopy,
-          rebalanceUserConfig);
+      updateSegmentIfNeeded(segmentId, currentMapFields.get(segmentId), 
targetMapFields.get(segmentId),
+          idealStateToUpdate, downtime, minAvailableReplicas);
     }
 
-    return idealStateCopy;
+    LOGGER.info("Updated instance state map of {} segments by directly setting 
to target ideal state mapping",
+        _rebalancerStats.directUpdatesToSegmentInstanceMap - directUpdates);
+    LOGGER.info("Updated instance state map of {} segments incrementally by 
adding a host from target ideal state mapping",
+        _rebalancerStats.incrementalUpdatesToSegmentInstanceMap - 
incrementalUpdates);
+
+    return idealStateToUpdate;
   }
 
   /**
-   * Updates a segment mapping if needed. In "downtime" mode or if there are 
common elements between source and
-   * target mapping, the segment mapping is set to the target mapping directly.
-   * In a no-downtime, if there are no commmon elements, one element of the 
source mapping is replaced with one
-   * from the target mapping.
+   * Updates a segment mapping if needed. In "downtime" mode.
+   * the segment mapping is set to the target mapping directly.
+   *
+   * In no-downtime mode, we check for couple of things and
+   * keep a certain number of serving replicas for the segment alive
+   * as specified in the rebalance configuration
+   *
+   * (1) if the number of common hosts between current and target
+   * mapping are enough to satisfy more than or equal to the number
+   * of serving replicas we should keep alive then we set to the
+   * target mapping directly (at one go) since this honors the no-downtime
+   * requirement
+   *
+   * (2) however, if there are not enough common hosts then we don't
+   * change the mapping directly as that will result in downtime. The
+   * mapping is updated incrementally (only one change for a given
+   * invocation of this method per segment) by removing a host
+   * from the current mapping and adding a host to it from
+   * target mapping.
+   *
+   * When the caller of this method returns (after going over all
+   * segments once), it persists the new ideal state and then determines
+   * if we have reached the target. If not, the process continues and we come
+   * here again for each method and check for steps (1) or (2) as applicable
+   *
+   * @param segmentId segment id
+   * @param currentIdealStateSegmentHosts map of this segment's hosts 
(replicas)
+   *                                     in current ideal state
+   * @param targetIdealStateSegmentHosts map of this segment's hosts (replicas)
+   *                                     in target ideal state
+   * @param idealStateToUpdate the ideal state that is updated as (caller
+   *                           passes this as a copy of current ideal state)
+   * @param downtime true if downtime is enabled, false otherwise
+   * @param minAvailableReplicas min number of replicas to be available
+   *                             if downtime is false
    */
   @VisibleForTesting
-  public void updateSegmentIfNeeded(String segmentId, Map<String, String> 
srcMap, Map<String, String> targetMap,
-      IdealState idealState, Configuration rebalanceUserConfig) {
+  public void updateSegmentIfNeeded(String segmentId, Map<String, String> 
currentIdealStateSegmentHosts,
+      Map<String, String> targetIdealStateSegmentHosts, IdealState 
idealStateToUpdate,
+      boolean downtime, int minAvailableReplicas) {
 
-    if (srcMap == null) {
+    LOGGER.debug("Will update instance map of segment: {}", segmentId);
+
+    if (currentIdealStateSegmentHosts == null) {
       //segment can be missing if retention manager has deleted it
-      LOGGER.info("Segment " + segmentId + " missing from current idealState. 
Skipping it.");
+      LOGGER.debug("Segment " + segmentId + " missing from current idealState. 
Skipping it.");
       return;
     }
 
-    if (rebalanceUserConfig
-        .getBoolean(RebalanceUserConfigConstants.DOWNTIME, 
RebalanceUserConfigConstants.DEFAULT_DOWNTIME)) {
-      setTargetState(idealState, segmentId, targetMap);
+    // dump additional detailed info for debugging
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Current hosts and states for segment {}", segmentId);
+      prettyPrintMap(currentIdealStateSegmentHosts, Level.DEBUG);
+      LOGGER.debug("Target hosts and states for segment {}", segmentId);
+      prettyPrintMap(targetIdealStateSegmentHosts, Level.DEBUG);
+    }
+
+    MapDifference difference = Maps.difference(targetIdealStateSegmentHosts, 
currentIdealStateSegmentHosts);
+
+    if (downtime) {
+      // in downtime mode, set the current ideal state to target at one go
+      LOGGER.debug("Downtime mode is enabled. Will set to target state at one 
go");
+      setTargetState(idealStateToUpdate, segmentId, 
targetIdealStateSegmentHosts);
+      ++_rebalancerStats.directUpdatesToSegmentInstanceMap;
+      _rebalancerStats.numSegmentMoves += targetIdealStateSegmentHosts.size();
       return;
     }
 
-    MapDifference difference = Maps.difference(targetMap, srcMap);
-    if (!difference.entriesInCommon().isEmpty()) {
-      // if there are entries in common, there won't be downtime
-      LOGGER.debug("Segment " + segmentId + " has common entries between 
current and expected ideal state");
-      setTargetState(idealState, segmentId, targetMap);
+    // we are in no-downtime mode
+    if (minAvailableReplicas >= currentIdealStateSegmentHosts.size()) {
 
 Review comment:
   I think so since if we fail to write ideal state in ZK due to version error 
and the next time we get ideal state, replicas might have gone down and there 
is a possibility that minAvailableReplicas now becomes >= replicas

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to