siddharthteotia commented on a change in pull request #4446: Add support in the 
rebalancer for the user to provide minimum number of serving replicas
URL: https://github.com/apache/incubator-pinot/pull/4446#discussion_r309558110
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/TableRebalancer.java
 ##########
 @@ -257,47 +451,208 @@ private void setTargetState(IdealState idealState, 
String segmentId, Map<String,
   }
 
   /**
-   * Check if IdealState = ExternalView. If its not equal, return the number 
of differing segments.
+   * Check if external view has converged to ideal state
+   * @param tableName name of table that we are rebalancing
+   * @return true if external view is same as ideal state, false otherwise
    */
-  public int isStable(String tableName) {
+  private boolean isStable(String tableName) {
     IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableName);
     ExternalView externalView = 
_helixAdmin.getResourceExternalView(_helixClusterName, tableName);
     Map<String, Map<String, String>> mapFieldsIS = 
idealState.getRecord().getMapFields();
     Map<String, Map<String, String>> mapFieldsEV = 
externalView.getRecord().getMapFields();
-    int numDiff = 0;
+
+    LOGGER.info("Checking if ideal state and external view are same for table 
{}", tableName);
+    int segmentsNotConverged = 0;
+
+    boolean stable = true;
     for (String segment : mapFieldsIS.keySet()) {
       Map<String, String> mapIS = mapFieldsIS.get(segment);
       Map<String, String> mapEV = mapFieldsEV.get(segment);
+      boolean converged = true;
+
+      if (mapEV == null) {
+        LOGGER.info("Host-state mapping of segment {} not yet available in 
external view", segment);
+        // we have found that external view hasn't yet converged to ideal 
state.
+        // still go on to check for other segments just so that we can dump 
debug
+        // messages or we can detect error. that's why we don't return
+        // false immediately
+        stable = false;
+        ++segmentsNotConverged;
+        continue;
+      }
+
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Hosts and states for segment {} in ideal state", 
segment);
+        prettyPrintMap(mapIS, Level.DEBUG);
+        LOGGER.debug("Hosts and states for segment {} in external view", 
segment);
+        prettyPrintMap(mapEV, Level.DEBUG);
+      }
 
       for (String server : mapIS.keySet()) {
-        String state = mapIS.get(server);
-        if (mapEV == null || mapEV.get(server) == null || 
!mapEV.get(server).equals(state)) {
-          LOGGER.debug("Mismatch: segment" + segment + " server:" + server + " 
state:" + state);
-          numDiff = numDiff + 1;
+        if (!mapEV.containsKey(server)) {
+          LOGGER.info("Host-state mapping of segment {} doesn't yet have 
server {} in external view",
+              segment, server);
+          // external view not yet converged
+          stable = false;
+          converged = false;
+        } else if (mapEV.get(server).equalsIgnoreCase("error")) {
+          LOGGER.error("Detected error state for segment {} for server {}", 
segment, server);
+          prettyPrintMap(mapIS, Level.ERROR);
+          prettyPrintMap(mapEV, Level.ERROR);
+          throw new IllegalStateException("External view reports error state 
for segment " + segment + " for host " + server,
+              new ExternalViewErrored());
+        } else {
+          final String stateInIdealState = mapIS.get(server);
+          final String stateInExternalView = mapEV.get(server);
+          if (!stateInIdealState.equalsIgnoreCase(stateInExternalView)) {
+            LOGGER.info("Host-state mapping of segment {} has state {} in 
external view and state {} in ideal state",
+                segment, stateInExternalView, stateInIdealState);
+            // external view not yet converged
+            stable = false;
+            converged = false;
+          }
         }
       }
+
+      if (!converged) {
+        segmentsNotConverged++;
+      }
+    }
+
+    LOGGER.info("{} of total {} segments from ideal state don't yet have 
external view converged",
+        segmentsNotConverged, mapFieldsIS.size());
+    return stable;
+  }
+
+  private static void prettyPrintMap(final Map<String, String> map, final 
Level level) {
+    if (map.size() > 0) {
+      final Joiner.MapJoiner mapJoiner = 
Joiner.on(",").withKeyValueSeparator(":");
+      if (level == Level.DEBUG) {
+        LOGGER.debug(mapJoiner.join(map));
+      } else if (level == Level.ERROR) {
+        LOGGER.error(mapJoiner.join(map));
+      } else if (level == Level.INFO) {
+        LOGGER.info(mapJoiner.join(map));
+      }
     }
-    return numDiff;
   }
 
   /**
    * Wait till state has stabilized {@link #isStable(String)}
    */
-  private void waitForStable(String resourceName)
-      throws InterruptedException {
-    int diff;
+  private void waitForStable(final String resourceName) {
     int INITIAL_WAIT_MS = 3000;
-    Thread.sleep(INITIAL_WAIT_MS);
-    do {
-      diff = isStable(resourceName);
-      if (diff == 0) {
-        break;
-      } else {
-        LOGGER.info(
-            "Waiting for externalView to match idealstate for table:" + 
resourceName + " Num segments difference:"
-                + diff);
-        Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
-      }
-    } while (diff > 0);
+    int wait = 0;
+    boolean done = false;
+    try {
+      Thread.sleep(INITIAL_WAIT_MS);
+      // the isStable method will bail out on detecting ERROR state
+      // in external view. However, if the external view never converges
+      // (for some reason) and we don't see ERROR state as well, then we
+      // don't want to wait indefinitely for the external view to converge
+      // thus, we use a fix max wait period  and bail out if external
+      // view hasn't converged within that time period
+      while (wait < EXTERNVAL_VIEW_STABILIZATON_MAX_WAIT_MS) {
+        if (isStable(resourceName)) {
+          done = true;
+          break;
+        } else {
+          LOGGER.info("Waiting for externalView to match idealstate for 
table:" + resourceName);
+          Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+          wait += EXTERNAL_VIEW_CHECK_INTERVAL_MS;
+          }
+        }
+      } catch (InterruptedException e) {
+      LOGGER.error("Rebalancer got interrupted while waiting for external view 
to converge");
+      Thread.currentThread().interrupt();
+    }
+
+    if (!done) {
+      LOGGER.error("External view didn't converge for table {} within max wait 
time of 5mins. Bailing out",
+          resourceName);
+      throw new IllegalStateException("External view didn't converge for table 
" + resourceName + " within max wait period of 5mins",
+          new ExternalViewConvergeTimeout());
+    }
+  }
+
+  /**
+   * Helper class that maintains stats that
+   * are later checked in tests to verify
+   * the behavior of the algorithm here
+   * that takes from current ideal state
+   * to a target ideal state
+   */
+  public static class RebalancerStats {
+    private int dryRun;
+    private int updatestoIdealStateInZK;
+    private int directUpdatesToSegmentInstanceMap;
+    private int incrementalUpdatesToSegmentInstanceMap;
+    private int numSegmentMoves;
+
+    RebalancerStats() {
+    }
+
+    /**
+     * Number of dry runs. Can only be 1
+     * @return
+     */
+    public int getDryRun() {
+      return dryRun;
+    }
+
+    /**
+     * Get the number of times rebalancer
+     * successfully updated the ideal
+     * state in ZK as it progresses through
+     * rebalancing.
+     * @return ideal state updates in ZK
+     */
+    public int getUpdatestoIdealStateInZK() {
+      return updatestoIdealStateInZK;
+    }
+
+    /**
+     * Get the number of times we updated the instance-state
+     * mapping acrosss all segments in ideal state in one step. This
 
 Review comment:
   Not sure what that implies. I have updated the javadoc if it is more clear 
now. We can discuss this

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to