This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 08bc488e2 Waged Pipeline Redesign (#2319)
08bc488e2 is described below

commit 08bc488e2ab79644b1850a79d76b975603f63c56
Author: Neal Sun <[email protected]>
AuthorDate: Thu Dec 22 09:48:58 2022 -0800

    Waged Pipeline Redesign (#2319)
    
    Introduces Emergency Rebalance and other performance optimization
---
 .../rebalancer/waged/AssignmentMetadataStore.java  | 116 ++++----
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  |  28 +-
 .../rebalancer/waged/WagedRebalancer.java          | 316 ++++++++++++++++-----
 .../waged/model/ClusterModelProvider.java          |  70 ++++-
 .../java/org/apache/helix/model/ClusterConfig.java |   1 +
 .../metrics/WagedRebalancerMetricCollector.java    |  20 +-
 .../waged/MockAssignmentMetadataStore.java         |  19 +-
 .../waged/TestAssignmentMetadataStore.java         |  24 --
 .../rebalancer/waged/TestWagedRebalancer.java      | 155 +++++++++-
 9 files changed, 582 insertions(+), 167 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 834fbad4e..0a532944a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -51,6 +51,7 @@ public class AssignmentMetadataStore {
   // volatile for double-checked locking
   protected volatile Map<String, ResourceAssignment> _globalBaseline;
   protected volatile Map<String, ResourceAssignment> _bestPossibleAssignment;
+  protected volatile int _bestPossibleVersion = 0;
 
   AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
     this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
@@ -99,60 +100,78 @@ public class AssignmentMetadataStore {
   }
 
   /**
-   * @return true if a new baseline was persisted.
+   * @param newAssignment
+   * @param path the path of the assignment record
+   * @param key  the key of the assignment in the record
    * @throws HelixException if the method failed to persist the baseline.
    */
-  public synchronized boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
-    return persistAssignment(globalBaseline, getBaseline(), _baselinePath, 
BASELINE_KEY);
+  private void persistAssignmentToMetadataStore(Map<String, 
ResourceAssignment> newAssignment, String path, String key)
+      throws HelixException {
+    // TODO: Make the write async?
+    // Persist to ZK
+    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
+    try {
+      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
+    } catch (IOException e) {
+      throw new HelixException(String.format("Failed to persist %s assignment 
to path %s", key, path), e);
+    }
   }
 
   /**
-   * @return true if a new best possible assignment was persisted.
-   * @throws HelixException if the method failed to persist the baseline.
+   * Persist a new baseline assignment to metadata store first, then to memory
+   * @param globalBaseline
    */
-  public synchronized boolean persistBestPossibleAssignment(
-      Map<String, ResourceAssignment> bestPossibleAssignment) {
-    return persistAssignment(bestPossibleAssignment, 
getBestPossibleAssignment(), _bestPossiblePath,
-        BEST_POSSIBLE_KEY);
+  public synchronized void persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
+    // write to metadata store
+    persistAssignmentToMetadataStore(globalBaseline, _baselinePath, 
BASELINE_KEY);
+    // write to memory
+    getBaseline().clear();
+    getBaseline().putAll(globalBaseline);
   }
 
-  public synchronized void clearAssignmentMetadata() {
-    persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath, 
BASELINE_KEY);
-    persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(), 
_bestPossiblePath,
-        BEST_POSSIBLE_KEY);
+  /**
+   * Persist a new best possible assignment to metadata store first, then to 
memory.
+   * Increment best possible version by 1 - this is a high priority in-memory 
write.
+   * @param bestPossibleAssignment
+   */
+  public synchronized void persistBestPossibleAssignment(Map<String, 
ResourceAssignment> bestPossibleAssignment) {
+    // write to metadata store
+    persistAssignmentToMetadataStore(bestPossibleAssignment, 
_bestPossiblePath, BEST_POSSIBLE_KEY);
+    // write to memory
+    getBestPossibleAssignment().clear();
+    getBestPossibleAssignment().putAll(bestPossibleAssignment);
+    _bestPossibleVersion++;
   }
 
   /**
-   * @param newAssignment
-   * @param cachedAssignment
-   * @param path the path of the assignment record
-   * @param key  the key of the assignment in the record
-   * @return true if a new assignment was persisted.
+   * Attempts to persist Best Possible Assignment in memory from an 
asynchronous thread.
+   * Persist only happens when the provided version is not stale - this is a 
low priority in-memory write.
+   * @param bestPossibleAssignment - new assignment to be persisted
+   * @param newVersion - attempted new version to write. This version is 
obtained earlier from getBestPossibleVersion()
+   * @return true if the attempt succeeded, false otherwise.
    */
-  // TODO: Enhance the return value so it is more intuitive to understand when 
the persist fails and
-  // TODO: when it is skipped.
-  private boolean persistAssignment(Map<String, ResourceAssignment> 
newAssignment,
-      Map<String, ResourceAssignment> cachedAssignment, String path,
-      String key) {
-    // TODO: Make the write async?
-    // If the assignment hasn't changed, skip writing to metadata store
-    if (compareAssignments(cachedAssignment, newAssignment)) {
-      return false;
-    }
-    // Persist to ZK
-    HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
-    try {
-      _dataAccessor.compressedBucketWrite(path, combinedAssignments);
-    } catch (IOException e) {
-      // TODO: Improve failure handling
-      throw new HelixException(
-          String.format("Failed to persist %s assignment to path %s", key, 
path), e);
+  public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+      Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+    // Check if the version is stale by this point
+    if (newVersion > _bestPossibleVersion) {
+      getBestPossibleAssignment().clear();
+      getBestPossibleAssignment().putAll(bestPossibleAssignment);
+      _bestPossibleVersion = newVersion;
+      return true;
     }
 
-    // Update the in-memory reference
-    cachedAssignment.clear();
-    cachedAssignment.putAll(newAssignment);
-    return true;
+    return false;
+  }
+
+  public int getBestPossibleVersion() {
+    return _bestPossibleVersion;
+  }
+
+  public synchronized void clearAssignmentMetadata() {
+    persistAssignmentToMetadataStore(Collections.emptyMap(), _baselinePath, 
BASELINE_KEY);
+    persistAssignmentToMetadataStore(Collections.emptyMap(), 
_bestPossiblePath, BEST_POSSIBLE_KEY);
+    getBaseline().clear();
+    getBestPossibleAssignment().clear();
   }
 
   protected synchronized void reset() {
@@ -207,16 +226,11 @@ public class AssignmentMetadataStore {
     return assignmentMap;
   }
 
-  /**
-   * Returns whether two assignments are same.
-   * @param oldAssignment
-   * @param newAssignment
-   * @return true if they are the same. False otherwise or oldAssignment is 
null
-   */
-  protected boolean compareAssignments(Map<String, ResourceAssignment> 
oldAssignment,
-      Map<String, ResourceAssignment> newAssignment) {
-    // If oldAssignment is null, that means that we haven't read from/written 
to
-    // the metadata store yet. In that case, we return false so that we write 
to metadata store.
-    return oldAssignment != null && oldAssignment.equals(newAssignment);
+  protected boolean isBaselineChanged(Map<String, ResourceAssignment> 
newBaseline) {
+    return !getBaseline().equals(newBaseline);
+  }
+
+  protected boolean isBestPossibleChanged(Map<String, ResourceAssignment> 
newBestPossible) {
+    return !getBestPossibleAssignment().equals(newBestPossible);
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index e94148e99..e79c028ec 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -67,26 +67,30 @@ public class ReadOnlyWagedRebalancer extends 
WagedRebalancer {
     }
 
     @Override
-    public boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
-      // If baseline hasn't changed, skip updating the metadata store
-      if (compareAssignments(_globalBaseline, globalBaseline)) {
-        return false;
-      }
+    public void persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
       // Update the in-memory reference only
       _globalBaseline = globalBaseline;
-      return true;
     }
 
     @Override
-    public boolean persistBestPossibleAssignment(
+    public void persistBestPossibleAssignment(
         Map<String, ResourceAssignment> bestPossibleAssignment) {
-      // If bestPossibleAssignment hasn't changed, skip updating the metadata 
store
-      if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) 
{
-        return false;
-      }
       // Update the in-memory reference only
       _bestPossibleAssignment = bestPossibleAssignment;
-      return true;
+      _bestPossibleVersion++;
+    }
+
+    @Override
+    public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+        Map<String, ResourceAssignment> bestPossibleAssignment, int 
newVersion) {
+      // Check if the version is stale by this point
+      if (newVersion > _bestPossibleVersion) {
+        _bestPossibleAssignment = bestPossibleAssignment;
+        _bestPossibleVersion = newVersion;
+        return true;
+      }
+
+      return false;
     }
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index a99ad26ab..3fffef2fb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
@@ -98,6 +99,7 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
 
   // To calculate the baseline asynchronously
   private final ExecutorService _baselineCalculateExecutor;
+  private final ExecutorService _bestPossibleCalculateExecutor;
   private final ResourceChangeDetector _changeDetector;
   private final HelixManager _manager;
   private final MappingCalculator<ResourceControllerDataProvider> 
_mappingCalculator;
@@ -110,10 +112,16 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
   private final LatencyMetric _writeLatency;
   private final CountMetric _partialRebalanceCounter;
   private final LatencyMetric _partialRebalanceLatency;
+  private final CountMetric _emergencyRebalanceCounter;
+  private final LatencyMetric _emergencyRebalanceLatency;
+  private final CountMetric _rebalanceOverwriteCounter;
+  private final LatencyMetric _rebalanceOverwriteLatency;
   private final LatencyMetric _stateReadLatency;
   private final BaselineDivergenceGauge _baselineDivergenceGauge;
 
   private boolean _asyncGlobalRebalanceEnabled;
+  private boolean _asyncPartialRebalanceEnabled;
+  private Future<Boolean> _asyncPartialRebalanceResult;
 
   // Note, the rebalance algorithm field is mutable so it should not be 
directly referred except for
   // the public method computeNewIdealStates.
@@ -149,7 +157,8 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
         // cluster has converged.
         helixManager == null ? new WagedRebalancerMetricCollector()
             : new 
WagedRebalancerMetricCollector(helixManager.getClusterName()),
-        ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED);
+        ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED,
+        ClusterConfig.DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED);
     _preference = 
ImmutableMap.copyOf(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
   }
 
@@ -166,12 +175,13 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
         // If metricCollector is not provided, instantiate a version that does 
not register metrics
         // in order to allow rebalancer to proceed
         metricCollectorOptional.orElse(new WagedRebalancerMetricCollector()),
-        false);
+        false, false);
   }
 
   private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, 
HelixManager manager,
-      MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled) {
+      MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled,
+      boolean isAsyncPartialRebalanceEnabled) {
     if (assignmentMetadataStore == null) {
       LOG.warn("Assignment Metadata Store is not configured properly."
           + " The rebalancer will not access the assignment store during the 
rebalance.");
@@ -203,6 +213,16 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
         
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
             .name(),
         LatencyMetric.class);
+    _emergencyRebalanceCounter = _metricCollector.getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(),
 CountMetric.class);
+    _emergencyRebalanceLatency = _metricCollector.getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+        LatencyMetric.class);
+    _rebalanceOverwriteCounter = _metricCollector.getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(),
 CountMetric.class);
+    _rebalanceOverwriteLatency = _metricCollector.getMetric(
+        
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+        LatencyMetric.class);
     _writeLatency = _metricCollector.getMetric(
         
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
         LatencyMetric.class);
@@ -216,7 +236,9 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     _changeDetector = new ResourceChangeDetector(true);
 
     _baselineCalculateExecutor = Executors.newSingleThreadExecutor();
+    _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
     _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+    _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
   }
 
   // Update the global rebalance mode to be asynchronous or synchronous
@@ -224,6 +246,11 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
   }
 
+  // Update the partial rebalance mode to be asynchronous or synchronous
+  public void setPartialRebalanceAsyncMode(boolean 
isAsyncPartialRebalanceEnabled) {
+    _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+  }
+
   // Update the rebalancer preference if the new options are different from 
the current preference.
   public synchronized void updateRebalancePreference(
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
@@ -249,6 +276,9 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     if (_baselineCalculateExecutor != null) {
       _baselineCalculateExecutor.shutdownNow();
     }
+    if (_bestPossibleCalculateExecutor != null) {
+      _bestPossibleCalculateExecutor.shutdownNow();
+    }
     if (_assignmentMetadataStore != null) {
       _assignmentMetadataStore.close();
     }
@@ -329,22 +359,24 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     // Schedule (or unschedule) delayed rebalance according to the delayed 
rebalance config.
     delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
 
-    Map<String, IdealState> newIdealStates = 
convertResourceAssignment(clusterData,
-        computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, 
currentStateOutput,
-            algorithm));
+    Map<String, ResourceAssignment> newBestPossibleAssignment =
+        computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, 
currentStateOutput, algorithm);
+    Map<String, IdealState> newIdealStates = 
convertResourceAssignment(clusterData, newBestPossibleAssignment);
 
     // The additional rebalance overwrite is required since the calculated 
mapping may contain
     // some delayed rebalanced assignments.
-    if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
+    if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) && 
requireRebalanceOverwrite(clusterData,
+        newBestPossibleAssignment)) {
       applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
-          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, 
resourceMap.keySet()),
-          algorithm);
+          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, 
resourceMap.keySet()), algorithm);
     }
     // Replace the assignment if user-defined preference list is configured.
     // Note the user-defined list is intentionally applied to the final 
mapping after calculation.
     // This is to avoid persisting it into the assignment store, which impacts 
the long term
     // assignment evenness and partition movements.
-    newIdealStates.forEach((key, value) -> 
applyUserDefinedPreferenceList(clusterData.getResourceConfig(key), value));
+    newIdealStates.forEach(
+        (resourceName, idealState) -> 
applyUserDefinedPreferenceList(clusterData.getResourceConfig(resourceName),
+            idealState));
 
     return newIdealStates;
   }
@@ -357,9 +389,10 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
       throws HelixRebalanceException {
     // Perform global rebalance for a new baseline assignment
     globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
-    // Perform partial rebalance for a new best possible assignment
+    // Perform emergency rebalance for a new best possible assignment
     Map<String, ResourceAssignment> newAssignment =
-        partialRebalance(clusterData, resourceMap, activeNodes, 
currentStateOutput, algorithm);
+        emergencyRebalance(clusterData, resourceMap, activeNodes, 
currentStateOutput, algorithm);
+
     return newAssignment;
   }
 
@@ -410,44 +443,26 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
    * @param algorithm
    * @throws HelixRebalanceException
    */
-  private void globalRebalance(ResourceControllerDataProvider clusterData,
-      Map<String, Resource> resourceMap, final CurrentStateOutput 
currentStateOutput,
-      RebalanceAlgorithm algorithm)
-      throws HelixRebalanceException {
+  private void globalRebalance(ResourceControllerDataProvider clusterData, 
Map<String, Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput, RebalanceAlgorithm 
algorithm) throws HelixRebalanceException {
     _changeDetector.updateSnapshots(clusterData);
     // Get all the changed items' information. Filter for the items that have 
content changed.
-    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
-        _changeDetector.getAllChanges();
-
-    if (clusterChanges.keySet().stream()
-        .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
-      // Build the cluster model for rebalance calculation.
-      // Note, for a Baseline calculation,
-      // 1. Ignore node status (disable/offline).
-      // 2. Use the previous Baseline as the only parameter about the previous 
assignment.
-      Map<String, ResourceAssignment> currentBaseline =
-          getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, 
resourceMap.keySet());
-      ClusterModel clusterModel;
-      try {
-        clusterModel = ClusterModelProvider
-            .generateClusterModelForBaseline(clusterData, resourceMap,
-                clusterData.getAllInstances(), clusterChanges, 
currentBaseline);
-      } catch (Exception ex) {
-        throw new HelixRebalanceException("Failed to generate cluster model 
for global rebalance.",
-            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
-      }
+    final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = 
_changeDetector.getAllChanges();
 
+    if 
(clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains))
 {
       final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
-      final String clusterName = clusterData.getClusterName();
       // Calculate the Baseline assignment for global rebalance.
       Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
         try {
-          // Note that we should schedule a new partial rebalance for a future 
rebalance pipeline if
-          // the planned partial rebalance in the current rebalance pipeline 
won't wait for the new
-          // baseline being calculated.
-          // So set shouldSchedulePartialRebalance to be 
!waitForGlobalRebalance
-          calculateAndUpdateBaseline(clusterModel, algorithm, 
!waitForGlobalRebalance, clusterName);
+          // If the synchronous thread does not wait for the baseline to be 
calculated, the synchronous thread should
+          // be triggered again after baseline is finished.
+          // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
+          doGlobalRebalance(clusterData, resourceMap, algorithm, 
currentStateOutput, !waitForGlobalRebalance,
+              clusterChanges);
         } catch (HelixRebalanceException e) {
+          if (_asyncGlobalRebalanceEnabled) {
+            _rebalanceFailureCount.increment(1L);
+          }
           LOG.error("Failed to calculate baseline assignment!", e);
           return false;
         }
@@ -469,27 +484,40 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
 
   /**
    * Calculate and update the Baseline assignment
-   * @param clusterModel
-   * @param algorithm
-   * @param shouldSchedulePartialRebalance True if the call should trigger a 
following partial rebalance
+   * @param shouldTriggerMainPipeline True if the call should trigger a 
following main pipeline rebalance
    *                                   so the new Baseline could be applied to 
cluster.
-   * @param clusterName
-   * @throws HelixRebalanceException
    */
-  private void calculateAndUpdateBaseline(ClusterModel clusterModel, 
RebalanceAlgorithm algorithm,
-      boolean shouldSchedulePartialRebalance, String clusterName)
-      throws HelixRebalanceException {
+  private void doGlobalRebalance(ResourceControllerDataProvider clusterData, 
Map<String, Resource> resourceMap,
+      RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, 
boolean shouldTriggerMainPipeline,
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws 
HelixRebalanceException {
     LOG.info("Start calculating the new baseline.");
     _baselineCalcCounter.increment(1L);
     _baselineCalcLatency.startMeasuringLatency();
 
-    boolean isBaselineChanged = false;
+    // Build the cluster model for rebalance calculation.
+    // Note, for a Baseline calculation,
+    // 1. Ignore node status (disable/offline).
+    // 2. Use the previous Baseline as the only parameter about the previous 
assignment.
+    Map<String, ResourceAssignment> currentBaseline =
+        getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, 
resourceMap.keySet());
+    ClusterModel clusterModel;
+    try {
+      clusterModel =
+          ClusterModelProvider.generateClusterModelForBaseline(clusterData, 
resourceMap, clusterData.getAllInstances(),
+              clusterChanges, currentBaseline);
+    } catch (Exception ex) {
+      throw new HelixRebalanceException("Failed to generate cluster model for 
global rebalance.",
+          HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+    }
+
     Map<String, ResourceAssignment> newBaseline = 
calculateAssignment(clusterModel, algorithm);
+    boolean isBaselineChanged =
+        _assignmentMetadataStore != null && 
_assignmentMetadataStore.isBaselineChanged(newBaseline);
     // Write the new baseline to metadata store
-    if (_assignmentMetadataStore != null) {
+    if (isBaselineChanged) {
       try {
         _writeLatency.startMeasuringLatency();
-        isBaselineChanged = 
_assignmentMetadataStore.persistBaseline(newBaseline);
+        _assignmentMetadataStore.persistBaseline(newBaseline);
         _writeLatency.endMeasuringLatency();
       } catch (Exception ex) {
         throw new HelixRebalanceException("Failed to persist the new baseline 
assignment.",
@@ -501,21 +529,67 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     _baselineCalcLatency.endMeasuringLatency();
     LOG.info("Global baseline calculation completed and has been persisted 
into metadata store.");
 
-    if (isBaselineChanged && shouldSchedulePartialRebalance) {
+    if (isBaselineChanged && shouldTriggerMainPipeline) {
       LOG.info("Schedule a new rebalance after the new baseline calculation 
has finished.");
-      RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
+      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, 
false);
     }
   }
 
-  private Map<String, ResourceAssignment> partialRebalance(
+  private void partialRebalance(
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
       RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    // If partial rebalance is async and the previous result is not completed 
yet,
+    // do not start another partial rebalance.
+    if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
+        && !_asyncPartialRebalanceResult.isDone()) {
+      return;
+    }
+
+    _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> 
{
+      try {
+        doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
+            currentStateOutput);
+      } catch (HelixRebalanceException e) {
+        if (_asyncPartialRebalanceEnabled) {
+          _rebalanceFailureCount.increment(1L);
+        }
+        LOG.error("Failed to calculate best possible assignment!", e);
+        return false;
+      }
+      return true;
+    });
+    if (!_asyncPartialRebalanceEnabled) {
+      try {
+        if (!_asyncPartialRebalanceResult.get()) {
+          throw new HelixRebalanceException("Failed to calculate for the new 
best possible.",
+              HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        throw new HelixRebalanceException("Failed to execute new best possible 
calculation.",
+            HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+      }
+    }
+  }
+
+  /**
+   * Calculate and update the Best Possible assignment
+   */
+  private void doPartialRebalance(ResourceControllerDataProvider clusterData, 
Map<String, Resource> resourceMap,
+      Set<String> activeNodes, RebalanceAlgorithm algorithm, 
CurrentStateOutput currentStateOutput)
+      throws HelixRebalanceException {
     LOG.info("Start calculating the new best possible assignment.");
     _partialRebalanceCounter.increment(1L);
     _partialRebalanceLatency.startMeasuringLatency();
-    // TODO: Consider combining the metrics for both baseline/best possible?
+
+    int newBestPossibleAssignmentVersion = -1;
+    if (_assignmentMetadataStore != null) {
+      newBestPossibleAssignmentVersion = 
_assignmentMetadataStore.getBestPossibleVersion() + 1;
+    } else {
+      LOG.debug("Assignment Metadata Store is null. Skip getting best possible 
assignment version.");
+    }
+
     // Read the baseline from metadata store
     Map<String, ResourceAssignment> currentBaseline =
         getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, 
resourceMap.keySet());
@@ -547,20 +621,79 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     
_baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
         currentBaseline, newAssignmentCopy);
 
-    if (_assignmentMetadataStore != null) {
-      try {
-        _writeLatency.startMeasuringLatency();
-        _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
-        _writeLatency.endMeasuringLatency();
-      } catch (Exception ex) {
-        throw new HelixRebalanceException("Failed to persist the new best 
possible assignment.",
-            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-      }
+    boolean bestPossibleUpdateSuccessful = false;
+    if (_assignmentMetadataStore != null && 
_assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
+      bestPossibleUpdateSuccessful = 
_assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
+          newBestPossibleAssignmentVersion);
     } else {
       LOG.debug("Assignment Metadata Store is null. Skip persisting the 
baseline assignment.");
     }
     _partialRebalanceLatency.endMeasuringLatency();
     LOG.info("Finish calculating the new best possible assignment.");
+
+    if (bestPossibleUpdateSuccessful) {
+      LOG.info("Schedule a new rebalance after the new best possible 
calculation has finished.");
+      RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, 
false);
+    }
+  }
+
+  protected Map<String, ResourceAssignment> emergencyRebalance(
+      ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
+      Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
+      RebalanceAlgorithm algorithm)
+      throws HelixRebalanceException {
+    LOG.info("Start emergency rebalance.");
+    _emergencyRebalanceCounter.increment(1L);
+    _emergencyRebalanceLatency.startMeasuringLatency();
+
+    Map<String, ResourceAssignment> currentBestPossibleAssignment =
+        getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+            resourceMap.keySet());
+
+    // Step 1: Check for permanent node down
+    AtomicBoolean allNodesActive = new AtomicBoolean(true);
+    
currentBestPossibleAssignment.values().parallelStream().forEach((resourceAssignment
 -> {
+      
resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
+        for (String instance : 
resourceAssignment.getReplicaMap(partition).keySet()) {
+          if (!activeNodes.contains(instance)) {
+            allNodesActive.set(false);
+            break;
+          }
+        }
+      });
+    }));
+
+
+    // Step 2: if there are permanent node downs, calculate for a new one best 
possible
+    Map<String, ResourceAssignment> newAssignment;
+    if (!allNodesActive.get()) {
+      LOG.info("Emergency rebalance responding to permanent node down.");
+      ClusterModel clusterModel;
+      try {
+        clusterModel =
+            
ClusterModelProvider.generateClusterModelForEmergencyRebalance(clusterData, 
resourceMap, activeNodes,
+                currentBestPossibleAssignment);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to generate cluster model 
for emergency rebalance.",
+            HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+      }
+      newAssignment = calculateAssignment(clusterModel, algorithm);
+    } else {
+      newAssignment = currentBestPossibleAssignment;
+    }
+
+    // Step 3: persist result to metadata store
+    persistBestPossibleAssignment(newAssignment);
+    _emergencyRebalanceLatency.endMeasuringLatency();
+    LOG.info("Finish emergency rebalance");
+
+    partialRebalance(clusterData, resourceMap, activeNodes, 
currentStateOutput, algorithm);
+    if (!_asyncPartialRebalanceEnabled) {
+      newAssignment = getBestPossibleAssignment(_assignmentMetadataStore, 
currentStateOutput,
+          resourceMap.keySet());
+      persistBestPossibleAssignment(newAssignment);
+    }
+
     return newAssignment;
   }
 
@@ -684,6 +817,22 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     return currentBestAssignment;
   }
 
+  private void persistBestPossibleAssignment(Map<String, ResourceAssignment> 
bestPossibleAssignment)
+      throws HelixRebalanceException {
+    if (_assignmentMetadataStore != null && 
_assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)) {
+      try {
+        _writeLatency.startMeasuringLatency();
+        
_assignmentMetadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+        _writeLatency.endMeasuringLatency();
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new best 
possible assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    } else {
+      LOG.debug("Assignment Metadata Store is null. Skip persisting the best 
possible assignment.");
+    }
+  }
+
   /**
    * Schedule rebalance according to the delayed rebalance logic.
    * @param clusterData the current cluster data cache
@@ -710,6 +859,34 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
     }
   }
 
+  protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider 
clusterData,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    AtomicBoolean allMinActiveReplicaMet = new AtomicBoolean(true);
+    
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> 
{
+      String resourceName = resourceAssignment.getResourceName();
+      IdealState currentIdealState = clusterData.getIdealState(resourceName);
+      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+      int numReplica = 
currentIdealState.getReplicaCount(enabledLiveInstances.size());
+      int minActiveReplica = 
DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+          
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+              currentIdealState), currentIdealState, numReplica);
+      
resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
+        int enabledLivePlacementCounter = 0;
+        for (String instance : 
resourceAssignment.getReplicaMap(partition).keySet()) {
+          if (enabledLiveInstances.contains(instance)) {
+            enabledLivePlacementCounter++;
+          }
+        }
+
+        if (enabledLivePlacementCounter < Math.min(minActiveReplica, 
numReplica)) {
+          allMinActiveReplicaMet.set(false);
+        }
+      });
+    }));
+
+    return !allMinActiveReplicaMet.get();
+  }
+
   /**
    * Update the rebalanced ideal states according to the real active nodes.
    * Since the rebalancing might be done with the delayed logic, the 
rebalanced ideal states
@@ -721,10 +898,13 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
    * @param baseline the baseline assignment.
    * @param algorithm the rebalance algorithm.
    */
-  private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
+  protected void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    _rebalanceOverwriteCounter.increment(1L);
+    _rebalanceOverwriteLatency.startMeasuringLatency();
+
     ClusterModel clusterModel;
     try {
       // Note this calculation uses the baseline as the best possible 
assignment input here.
@@ -761,6 +941,8 @@ public class WagedRebalancer implements 
StatefulRebalancer<ResourceControllerDat
               Math.min(minActiveReplica, numReplica));
 
       newIdealState.setPreferenceLists(finalPreferenceLists);
+
+      _rebalanceOverwriteLatency.endMeasuringLatency();
     }
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 0fc1f2b05..ce82c3207 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -51,7 +51,31 @@ public class ClusterModelProvider {
     PARTIAL,
     // Set the rebalance scope to cover all replicas that need relocation 
based on the cluster
     // changes.
-    GLOBAL_BASELINE
+    GLOBAL_BASELINE,
+    // Set the rebalance scope to cover only replicas that are assigned to 
downed instances.
+    EMERGENCY
+  }
+
+  /**
+   * Generate a new Cluster Model object according to the current cluster 
status for emergency
+   * rebalance. The rebalance scope is configured for recovering replicas that 
are on permanently
+   * downed nodes
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be 
rebalanced. Note that any
+   *                               resources that are not in this list will be 
removed from the
+   *                               final assignment.
+   * @param activeInstances        The active instances that will be used in 
the calculation.
+   *                               Note this list can be different from the 
real active node list
+   *                               according to the rebalancer logic.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that 
was generated in the
+   *                               previous rebalance.
+   * @return the new cluster model
+   */
+  public static ClusterModel 
generateClusterModelForEmergencyRebalance(ResourceControllerDataProvider 
dataProvider,
+      Map<String, Resource> resourceMap, Set<String> activeInstances,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    return generateClusterModel(dataProvider, resourceMap, activeInstances, 
Collections.emptyMap(),
+        Collections.emptyMap(), bestPossibleAssignment, 
RebalanceScopeType.EMERGENCY);
   }
 
   /**
@@ -165,6 +189,10 @@ public class ClusterModelProvider {
             findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap, 
activeInstances,
                 idealAssignment, currentAssignment, allocatedReplicas);
         break;
+      case EMERGENCY:
+        toBeAssignedReplicas = 
findToBeAssignedReplicasOnDownInstances(replicaMap, activeInstances,
+            currentAssignment, allocatedReplicas);
+        break;
       default:
         throw new HelixException("Unknown rebalance scope type: " + scopeType);
     }
@@ -391,6 +419,46 @@ public class ClusterModelProvider {
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Find replicas that were assigned to non-active nodes in the current 
assignment.
+   *
+   * @param replicaMap             A map contains all the replicas grouped by 
resource name.
+   * @param activeInstances        All the instances that are live and enabled 
according to the delay rebalance configuration.
+   * @param currentAssignment      The current assignment that was generated 
in the previous rebalance.
+   * @param allocatedReplicas      A map of <Instance -> replicas> to return 
the allocated replicas grouped by the target instance name.
+   * @return The replicas that need to be reassigned.
+   */
+  private static Set<AssignableReplica> 
findToBeAssignedReplicasOnDownInstances(
+      Map<String, Set<AssignableReplica>> replicaMap, Set<String> 
activeInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    // For any replica that are assigned to non-active instances (down 
instances), add them.
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+    for (String resourceName : replicaMap.keySet()) {
+      Map<String, Map<String, Set<String>>> stateInstanceMap = 
getStateInstanceMap(currentAssignment.get(resourceName));
+
+      for (AssignableReplica replica : replicaMap.get(resourceName)) {
+        String partitionName = replica.getPartitionName();
+        String replicaState = replica.getReplicaState();
+        Set<String> currentAllocations =
+            stateInstanceMap.getOrDefault(partitionName, 
Collections.emptyMap())
+                .getOrDefault(replicaState, Collections.emptySet());
+        if (!currentAllocations.isEmpty()) {
+          String allocatedInstance = currentAllocations.iterator().next();
+          if (activeInstances.contains(allocatedInstance)) {
+            allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new 
HashSet<>()).add(replica);
+          }
+          else {
+            toBeAssignedReplicas.add(replica);
+          }
+          currentAllocations.remove(allocatedInstance);
+        }
+      }
+    }
+
+    return toBeAssignedReplicas;
+  }
+
   /**
    * Filter to remove all invalid allocations that are not on the active 
instances.
    * @param assignment
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 829724142..c4f9f914f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -184,6 +184,7 @@ public class ClusterConfig extends HelixProperty {
   private final static int MAX_REBALANCE_PREFERENCE = 1000;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = 
true;
+  public final static boolean DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED = 
true;
   private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
   private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET 
= -1;
   private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index dcd9a0857..94a99c95a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -43,6 +43,8 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
     // Per-stage latency metrics
     GlobalBaselineCalcLatencyGauge,
     PartialRebalanceLatencyGauge,
+    EmergencyRebalanceLatencyGauge,
+    RebalanceOverwriteLatencyGauge,
 
     // The following latency metrics are related to AssignmentMetadataStore
     StateReadLatencyGauge,
@@ -61,7 +63,9 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
 
     // Waged rebalance counters.
     GlobalBaselineCalcCounter,
-    PartialRebalanceCounter
+    PartialRebalanceCounter,
+    EmergencyRebalanceCounter,
+    RebalanceOverwriteCounter
   }
 
   public WagedRebalancerMetricCollector(String clusterName) {
@@ -97,6 +101,12 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
     LatencyMetric partialRebalanceLatencyGauge =
         new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
             getResetIntervalInMs());
+    LatencyMetric emergencyRebalanceLatencyGauge =
+        new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+            getResetIntervalInMs());
+    LatencyMetric rebalanceOverwriteLatencyGauge =
+        new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+            getResetIntervalInMs());
     LatencyMetric stateReadLatencyGauge =
         new 
RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
             getResetIntervalInMs());
@@ -111,15 +121,23 @@ public class WagedRebalancerMetricCollector extends 
MetricCollector {
         new 
RebalanceCounter(WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name());
     CountMetric partialRebalanceCounter =
         new 
RebalanceCounter(WagedRebalancerMetricNames.PartialRebalanceCounter.name());
+    CountMetric emergencyRebalanceCounter =
+        new 
RebalanceCounter(WagedRebalancerMetricNames.EmergencyRebalanceCounter.name());
+    CountMetric rebalanceOverwriteCounter =
+        new 
RebalanceCounter(WagedRebalancerMetricNames.RebalanceOverwriteCounter.name());
 
     // Add metrics to WagedRebalancerMetricCollector
     addMetric(globalBaselineCalcLatencyGauge);
     addMetric(partialRebalanceLatencyGauge);
+    addMetric(emergencyRebalanceLatencyGauge);
+    addMetric(rebalanceOverwriteLatencyGauge);
     addMetric(stateReadLatencyGauge);
     addMetric(stateWriteLatencyGauge);
     addMetric(baselineDivergenceGauge);
     addMetric(calcFailureCount);
     addMetric(globalBaselineCalcCounter);
     addMetric(partialRebalanceCounter);
+    addMetric(emergencyRebalanceCounter);
+    addMetric(rebalanceOverwriteCounter);
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index a2220a5d4..fb59b8146 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -39,19 +39,30 @@ public class MockAssignmentMetadataStore extends 
AssignmentMetadataStore {
     return _globalBaseline == null ? Collections.emptyMap() : _globalBaseline;
   }
 
-  public boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
+  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
     _globalBaseline = globalBaseline;
-    return true;
   }
 
   public Map<String, ResourceAssignment> getBestPossibleAssignment() {
     return _bestPossibleAssignment == null ? Collections.emptyMap() : 
_bestPossibleAssignment;
   }
 
-  public boolean persistBestPossibleAssignment(
+  public void persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     _bestPossibleAssignment = bestPossibleAssignment;
-    return true;
+    _bestPossibleVersion++;
+  }
+
+  public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+      Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+    // Check if the version is stale by this point
+    if (newVersion > _bestPossibleVersion) {
+      _bestPossibleAssignment = bestPossibleAssignment;
+      _bestPossibleVersion = newVersion;
+      return true;
+    }
+
+    return false;
   }
 
   public void close() {
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 84bc82957..7c5740423 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -96,31 +96,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     Assert.assertTrue(_store.getBestPossibleAssignment().isEmpty());
   }
 
-  /**
-   * Test that if the old assignment and new assignment are the same,
-   */
   @Test(dependsOnMethods = "testReadEmptyBaseline")
-  public void testAvoidingRedundantWrite() {
-    Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
-
-    // Call persist functions
-    _store.persistBaseline(dummyAssignment);
-    _store.persistBestPossibleAssignment(dummyAssignment);
-
-    // Check that only one version exists
-    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
-    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 
1);
-
-    // Call persist functions again
-    _store.persistBaseline(dummyAssignment);
-    _store.persistBestPossibleAssignment(dummyAssignment);
-
-    // Check that only one version exists still
-    Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
-    Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 
1);
-  }
-
-  @Test(dependsOnMethods = "testAvoidingRedundantWrite")
   public void testAssignmentCache() {
     Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
     // Call persist functions
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index cfcbe298e..d011c5c2e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,9 +20,12 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.io.IOException;
+import java.sql.Array;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -32,8 +35,10 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import 
org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
 import 
org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -50,6 +55,7 @@ import 
org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -255,8 +261,8 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
     Map<String, ResourceAssignment> testResourceAssignmentMap = new 
HashMap<>();
     ZNRecord mappingNode = new ZNRecord(_resourceNames.get(0));
     HashMap<String, String> mapping = new HashMap<>();
-    mapping.put(_partitionNames.get(0), "MASTER");
-    mappingNode.setMapField(_testInstanceId, mapping);
+    mapping.put(_testInstanceId, "MASTER");
+    mappingNode.setMapField(_partitionNames.get(0), mapping);
     testResourceAssignmentMap.put(_resourceNames.get(0), new 
ResourceAssignment(mappingNode));
 
     _metadataStore.reset();
@@ -373,9 +379,9 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
           clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), 
_algorithm);
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
-      Assert.assertEquals(ex.getFailureType(), 
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
+      Assert.assertEquals(ex.getFailureType(), 
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
       Assert.assertEquals(ex.getMessage(),
-          "Failed to generate cluster model for partial rebalance. Failure 
Type: INVALID_CLUSTER_STATUS");
+          "Failed to calculate for the new best possible. Failure Type: 
FAILED_TO_CALCULATE");
     }
 
     // The rebalance will be done with empty mapping result since there is no 
previously calculated
@@ -389,7 +395,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
   public void testInvalidRebalancerStatus() throws IOException {
     // Mock a metadata store that will fail on all the calls.
     AssignmentMetadataStore metadataStore = 
Mockito.mock(AssignmentMetadataStore.class);
-    when(metadataStore.getBaseline())
+    when(metadataStore.getBestPossibleAssignment())
         .thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
     WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, 
_algorithm, Optional.empty());
 
@@ -404,7 +410,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
       Assert.assertEquals(ex.getFailureType(),
           HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
       Assert.assertEquals(ex.getMessage(),
-          "Failed to get the current baseline assignment because of unexpected 
error. Failure Type: INVALID_REBALANCER_STATUS");
+          "Failed to get the current best possible assignment because of 
unexpected error. Failure Type: INVALID_REBALANCER_STATUS");
     }
   }
 
@@ -439,7 +445,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), 
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
-      Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: 
FAILED_TO_CALCULATE");
+      Assert.assertEquals(ex.getMessage(), "Failed to calculate for the new 
best possible. Failure Type: FAILED_TO_CALCULATE");
     }
     // But if call with the public method computeNewIdealStates(), the 
rebalance will return with
     // the previous rebalance result.
@@ -575,6 +581,141 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
     Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
   }
 
+  @Test(dependsOnMethods = "testRebalance")
+  public void testEmergencyRebalance() throws IOException, 
HelixRebalanceException {
+    _metadataStore.reset();
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new 
MockRebalanceAlgorithm());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, 
spyAlgorithm, Optional.empty());
+
+    // Cluster config change will trigger baseline to be recalculated.
+    when(clusterData.getRefreshedChangeTypes())
+        
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, Resource> resourceMap =
+        
clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+    // Populate best possible assignment
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    // Global Rebalance once, Partial Rebalance once
+    verify(spyAlgorithm, times(2)).calculate(any());
+
+    // Artificially insert an offline node in the best possible assignment
+    Map<String, ResourceAssignment> bestPossibleAssignment =
+        _metadataStore.getBestPossibleAssignment();
+    String offlineResource = _resourceNames.get(0);
+    String offlinePartition = _partitionNames.get(0);
+    String offlineState = "MASTER";
+    String offlineInstance = "offlineInstance";
+    for (Partition partition : 
bestPossibleAssignment.get(offlineResource).getMappedPartitions()) {
+      if (partition.getPartitionName().equals(offlinePartition)) {
+        bestPossibleAssignment.get(offlineResource)
+            .addReplicaMap(partition, 
Collections.singletonMap(offlineInstance, offlineState));
+      }
+    }
+    _metadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+
+    // This should trigger both emergency rebalance and partial rebalance
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    ArgumentCaptor<ClusterModel> capturedClusterModel = 
ArgumentCaptor.forClass(ClusterModel.class);
+    // 2 from previous case, Emergency + Partial from this case, 4 in total
+    verify(spyAlgorithm, times(4)).calculate(capturedClusterModel.capture());
+    // In the cluster model for Emergency rebalance, the assignableReplica is 
the offline one
+    ClusterModel clusterModelForEmergencyRebalance = 
capturedClusterModel.getAllValues().get(2);
+    
Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().size(),
 1);
+    
Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).size(),
 1);
+    AssignableReplica assignableReplica =
+        
clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).iterator().next();
+    Assert.assertEquals(assignableReplica.getPartitionName(), 
offlinePartition);
+    Assert.assertEquals(assignableReplica.getReplicaState(), offlineState);
+
+    bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
+    for (Map.Entry<String, ResourceAssignment> entry : 
bestPossibleAssignment.entrySet()) {
+      ResourceAssignment resourceAssignment = entry.getValue();
+      for (Partition partition : resourceAssignment.getMappedPartitions()) {
+        for (String instance: 
resourceAssignment.getReplicaMap(partition).keySet()) {
+          Assert.assertNotSame(instance, offlineInstance);
+        }
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testRebalanceOverwriteTrigger() throws IOException, 
HelixRebalanceException {
+    _metadataStore.reset();
+
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    // Enable delay rebalance
+    ClusterConfig clusterConfig = clusterData.getClusterConfig();
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(1);
+    clusterData.setClusterConfig(clusterConfig);
+
+    // force create a fake offlineInstance that's in delay window
+    Set<String> instances = new HashSet<>(_instances);
+    String offlineInstance = "offlineInstance";
+    instances.add(offlineInstance);
+    when(clusterData.getAllInstances()).thenReturn(instances);
+    Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
+    instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + 
Integer.MAX_VALUE);
+    
when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
+    Map<String, InstanceConfig> instanceConfigMap = 
clusterData.getInstanceConfigMap();
+    instanceConfigMap.put(offlineInstance, 
createMockInstanceConfig(offlineInstance));
+    when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    // Set minActiveReplica to 0 so that requireRebalanceOverwrite returns 
false
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState idealState = clusterData.getIdealState(resource);
+      idealState.setMinActiveReplicas(0);
+      isMap.put(resource, idealState);
+    }
+    when(clusterData.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> 
isMap.get(invocationOnMock.getArguments()[0]));
+    when(clusterData.getIdealStates()).thenReturn(isMap);
+
+    MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new 
MockRebalanceAlgorithm());
+    WagedRebalancer rebalancer = Mockito.spy(new 
WagedRebalancer(_metadataStore, spyAlgorithm, Optional.empty()));
+
+    // Cluster config change will trigger baseline to be recalculated.
+    when(clusterData.getRefreshedChangeTypes())
+        
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, Resource> resourceMap =
+        
clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+    // Populate best possible assignment
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    verify(rebalancer, times(1)).requireRebalanceOverwrite(any(), any());
+    verify(rebalancer, times(0)).applyRebalanceOverwrite(any(), any(), any(), 
any(), any());
+
+    // Set minActiveReplica to 1 so that requireRebalanceOverwrite returns true
+    for (String resource : _resourceNames) {
+      IdealState idealState = clusterData.getIdealState(resource);
+      idealState.setMinActiveReplicas(3);
+      isMap.put(resource, idealState);
+    }
+    when(clusterData.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> 
isMap.get(invocationOnMock.getArguments()[0]));
+    when(clusterData.getIdealStates()).thenReturn(isMap);
+
+    _metadataStore.reset();
+    // Update the config so the cluster config will be marked as changed.
+    clusterConfig = clusterData.getClusterConfig();
+    Map<String, Integer> defaultCapacityMap =
+        new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
+    defaultCapacityMap.put("foobar", 0);
+    clusterConfig.setDefaultInstanceCapacityMap(defaultCapacityMap);
+    clusterData.setClusterConfig(clusterConfig);
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
+    verify(rebalancer, times(2)).requireRebalanceOverwrite(any(), any());
+    verify(rebalancer, times(1)).applyRebalanceOverwrite(any(), any(), any(), 
any(), any());
+  }
+
   @Test(dependsOnMethods = "testRebalance")
   public void testReset() throws IOException, HelixRebalanceException {
     _metadataStore.reset();

Reply via email to