This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 302e1e7  Asynchronously calculating the Baseline (#632)
302e1e7 is described below

commit 302e1e7ef6df047a8b08a66282a9ba810bc96690
Author: Jiajun Wang <[email protected]>
AuthorDate: Fri Dec 20 16:42:04 2019 -0800

    Asynchronously calculating the Baseline (#632)
    
    * Enable the Baseline calculation to be asynchronously done.
    
    This will greatly fasten the rebalance speed. Basically, the WAGED 
rebalancer will firstly partial rebalance to recover the invalid replica 
allocations (for example, the ones that are on a disabled instance). Then it 
calculates the new baseline by global rebalancing.
---
 .../rebalancer/waged/AssignmentMetadataStore.java  |  24 ++-
 .../rebalancer/waged/WagedRebalancer.java          | 109 ++++++++++---
 .../stages/BestPossibleStateCalcStage.java         |  17 +-
 .../BestPossibleExternalViewVerifier.java          |  14 +-
 .../waged/MockAssignmentMetadataStore.java         |   6 +-
 .../rebalancer/TestMixedModeAutoRebalance.java     | 181 +++++++++++----------
 .../TestMixedModeWagedRebalance.java               |  10 +-
 .../TestWagedExpandCluster.java                    |   3 +-
 8 files changed, 229 insertions(+), 135 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 843d1b6..6128280 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -92,11 +92,17 @@ public class AssignmentMetadataStore {
     return _bestPossibleAssignment;
   }
 
-  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+  /**
+   * @return true if a new baseline was persisted.
+   * @throws HelixException if the method failed to persist the baseline.
+   */
+  // TODO: Enhance the return value so it is more intuitive to understand when 
the persist fails and
+  // TODO: when it is skipped.
+  public boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
     // TODO: Make the write async?
     // If baseline hasn't changed, skip writing to metadata store
     if (compareAssignments(_globalBaseline, globalBaseline)) {
-      return;
+      return false;
     }
     // Persist to ZK
     HelixProperty combinedAssignments = combineAssignments(BASELINE_KEY, 
globalBaseline);
@@ -109,14 +115,21 @@ public class AssignmentMetadataStore {
 
     // Update the in-memory reference
     _globalBaseline = globalBaseline;
+    return true;
   }
 
-  public void persistBestPossibleAssignment(
+  /**
+   * @return true if a new best possible assignment was persisted.
+   * @throws HelixException if the method failed to persist the baseline.
+   */
+  // TODO: Enhance the return value so it is more intuitive to understand when 
the persist fails and
+  // TODO: when it is skipped.
+  public boolean persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     // TODO: Make the write async?
     // If bestPossibleAssignment hasn't changed, skip writing to metadata store
     if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
-      return;
+      return false;
     }
     // Persist to ZK
     HelixProperty combinedAssignments =
@@ -130,6 +143,7 @@ public class AssignmentMetadataStore {
 
     // Update the in-memory reference
     _bestPossibleAssignment = bestPossibleAssignment;
+    return true;
   }
 
   protected void finalize() {
@@ -179,7 +193,7 @@ public class AssignmentMetadataStore {
    * @param newAssignment
    * @return true if they are the same. False otherwise or oldAssignment is 
null
    */
-  private boolean compareAssignments(Map<String, ResourceAssignment> 
oldAssignment,
+  protected boolean compareAssignments(Map<String, ResourceAssignment> 
oldAssignment,
       Map<String, ResourceAssignment> newAssignment) {
     // If oldAssignment is null, that means that we haven't read from/written 
to
     // the metadata store yet. In that case, we return false so that we write 
to metadata store.
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index b05287e..8fb22d8 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -26,6 +26,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
@@ -54,6 +58,7 @@ import 
org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import 
org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +84,8 @@ public class WagedRebalancer {
       .of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
           ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);
 
+  // To calculate the baseline asynchronously
+  private final ExecutorService _baselineCalculateExecutor;
   private final ResourceChangeDetector _changeDetector;
   private final HelixManager _manager;
   private final MappingCalculator<ResourceControllerDataProvider> 
_mappingCalculator;
@@ -86,14 +93,16 @@ public class WagedRebalancer {
 
   private final MetricCollector _metricCollector;
   private final CountMetric _rebalanceFailureCount;
-  private final CountMetric _globalBaselineCalcCounter;
-  private final LatencyMetric _globalBaselineCalcLatency;
+  private final CountMetric _baselineCalcCounter;
+  private final LatencyMetric _baselineCalcLatency;
   private final LatencyMetric _writeLatency;
   private final CountMetric _partialRebalanceCounter;
   private final LatencyMetric _partialRebalanceLatency;
   private final LatencyMetric _stateReadLatency;
   private final BaselineDivergenceGauge _baselineDivergenceGauge;
 
+  private boolean _asyncGlobalRebalanceEnabled;
+
   // Note, the rebalance algorithm field is mutable so it should not be 
directly referred except for
   // the public method computeNewIdealStates.
   private RebalanceAlgorithm _rebalanceAlgorithm;
@@ -109,7 +118,8 @@ public class WagedRebalancer {
   }
 
   public WagedRebalancer(HelixManager helixManager,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference) {
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference,
+      boolean isAsyncGlobalRebalanceEnabled) {
     this(helixManager == null ? null
             : 
constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
                 helixManager.getClusterName()), 
ConstraintBasedAlgorithmFactory.getInstance(preference),
@@ -127,7 +137,8 @@ public class WagedRebalancer {
         // CurrentState-based rebalancing. 2. Tests that require instrumenting 
the rebalancer for
         // verifying whether the cluster has converged.
         helixManager == null ? null
-            : new 
WagedRebalancerMetricCollector(helixManager.getClusterName()));
+            : new 
WagedRebalancerMetricCollector(helixManager.getClusterName()),
+        isAsyncGlobalRebalanceEnabled);
     _preference = ImmutableMap.copyOf(preference);
   }
 
@@ -140,7 +151,7 @@ public class WagedRebalancer {
    */
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm) {
-    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), 
null, null);
+    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), 
null, null, false);
   }
 
   /**
@@ -152,12 +163,13 @@ public class WagedRebalancer {
    */
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MetricCollector metricCollector) {
-    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), 
null, metricCollector);
+    this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), 
null, metricCollector,
+        false);
   }
 
   private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, 
HelixManager manager,
-      MetricCollector metricCollector) {
+      MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled) {
     if (assignmentMetadataStore == null) {
       LOG.warn("Assignment Metadata Store is not configured properly."
           + " The rebalancer will not access the assignment store during the 
rebalance.");
@@ -174,10 +186,10 @@ public class WagedRebalancer {
     _rebalanceFailureCount = _metricCollector.getMetric(
         
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
         CountMetric.class);
-    _globalBaselineCalcCounter = _metricCollector.getMetric(
+    _baselineCalcCounter = _metricCollector.getMetric(
         
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
         CountMetric.class);
-    _globalBaselineCalcLatency = _metricCollector.getMetric(
+    _baselineCalcLatency = _metricCollector.getMetric(
         
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
             .name(),
         LatencyMetric.class);
@@ -199,23 +211,32 @@ public class WagedRebalancer {
         BaselineDivergenceGauge.class);
 
     _changeDetector = new ResourceChangeDetector(true);
+
+    _baselineCalculateExecutor = Executors.newSingleThreadExecutor();
+    _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+  }
+
+  // Update the global rebalance mode to be asynchronous or synchronous
+  public void setGlobalRebalanceAsyncMode(boolean 
isAsyncGlobalRebalanceEnabled) {
+    _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
   }
 
-  // Update the rebalancer preference configuration if the new preference is 
different from the
-  // current preference configuration.
-  public void updatePreference(
+  // Update the rebalancer preference if the new options are different from 
the current preference.
+  public synchronized void updateRebalancePreference(
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
-    if (_preference.equals(NOT_CONFIGURED_PREFERENCE) || 
_preference.equals(newPreference)) {
-      // 1. if the preference was not configured during constructing, no need 
to update.
-      // 2. if the preference equals to the new preference, no need to update.
-      return;
+    // 1. if the preference was not configured during constructing, no need to 
update.
+    // 2. if the preference equals to the new preference, no need to update.
+    if (!_preference.equals(NOT_CONFIGURED_PREFERENCE) && 
!_preference.equals(newPreference)) {
+      _rebalanceAlgorithm = 
ConstraintBasedAlgorithmFactory.getInstance(newPreference);
+      _preference = ImmutableMap.copyOf(newPreference);
     }
-    _rebalanceAlgorithm = 
ConstraintBasedAlgorithmFactory.getInstance(newPreference);
-    _preference = ImmutableMap.copyOf(newPreference);
   }
 
   // Release all the resources.
   public void close() {
+    if (_baselineCalculateExecutor != null) {
+      _baselineCalculateExecutor.shutdownNow();
+    }
     if (_assignmentMetadataStore != null) {
       _assignmentMetadataStore.close();
     }
@@ -295,7 +316,7 @@ public class WagedRebalancer {
     return newIdealStates;
   }
 
-  // Coordinate baseline recalculation and partial rebalance according to the 
cluster changes.
+  // Coordinate global rebalance and partial rebalance according to the 
cluster changes.
   private Map<String, IdealState> computeBestPossibleStates(
       ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
       final CurrentStateOutput currentStateOutput, RebalanceAlgorithm 
algorithm)
@@ -413,7 +434,33 @@ public class WagedRebalancer {
             HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
       }
 
-      calculateAndUpdateBaseline(clusterModel, algorithm);
+      final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
+      final String clusterName = clusterData.getClusterName();
+      // Calculate the Baseline assignment for global rebalance.
+      Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
+        try {
+          // Note that we should schedule a new partial rebalance for a future 
rebalance pipeline if
+          // the planned partial rebalance in the current rebalance pipeline 
won't wait for the new
+          // baseline being calculated.
+          // So set shouldSchedulePartialRebalance to be 
!waitForGlobalRebalance
+          calculateAndUpdateBaseline(clusterModel, algorithm, 
!waitForGlobalRebalance, clusterName);
+        } catch (HelixRebalanceException e) {
+          LOG.error("Failed to calculate baseline assignment!", e);
+          return false;
+        }
+        return true;
+      });
+      if (waitForGlobalRebalance) {
+        try {
+          if (!result.get()) {
+            throw new HelixRebalanceException("Failed to calculate for the new 
Baseline.",
+                HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          throw new HelixRebalanceException("Failed to execute new Baseline 
calculation.",
+              HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+        }
+      }
     }
   }
 
@@ -421,20 +468,25 @@ public class WagedRebalancer {
    * Calculate and update the Baseline assignment
    * @param clusterModel
    * @param algorithm
+   * @param shouldSchedulePartialRebalance True if the call should trigger a 
following partial rebalance
+   *                                   so the new Baseline could be applied to 
cluster.
+   * @param clusterName
    * @throws HelixRebalanceException
    */
-  private void calculateAndUpdateBaseline(ClusterModel clusterModel, 
RebalanceAlgorithm algorithm)
+  private void calculateAndUpdateBaseline(ClusterModel clusterModel, 
RebalanceAlgorithm algorithm,
+      boolean shouldSchedulePartialRebalance, String clusterName)
       throws HelixRebalanceException {
     LOG.info("Start calculating the new baseline.");
-    _globalBaselineCalcCounter.increment(1L);
-    _globalBaselineCalcLatency.startMeasuringLatency();
+    _baselineCalcCounter.increment(1L);
+    _baselineCalcLatency.startMeasuringLatency();
 
+    boolean isBaselineChanged = false;
     Map<String, ResourceAssignment> newBaseline = 
calculateAssignment(clusterModel, algorithm);
     // Write the new baseline to metadata store
     if (_assignmentMetadataStore != null) {
       try {
         _writeLatency.startMeasuringLatency();
-        _assignmentMetadataStore.persistBaseline(newBaseline);
+        isBaselineChanged = 
_assignmentMetadataStore.persistBaseline(newBaseline);
         _writeLatency.endMeasuringLatency();
       } catch (Exception ex) {
         throw new HelixRebalanceException("Failed to persist the new baseline 
assignment.",
@@ -443,8 +495,13 @@ public class WagedRebalancer {
     } else {
       LOG.debug("Assignment Metadata Store is null. Skip persisting the 
baseline assignment.");
     }
-    _globalBaselineCalcLatency.endMeasuringLatency();
-    LOG.info("Finish calculating the new baseline.");
+    _baselineCalcLatency.endMeasuringLatency();
+    LOG.info("Global baseline calculation completed and has been persisted 
into metadata store.");
+
+    if (isBaselineChanged && shouldSchedulePartialRebalance) {
+      LOG.info("Schedule a new rebalance after the new baseline calculation 
has finished.");
+      RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
+    }
   }
 
   private Map<String, ResourceAssignment> partialRebalance(
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index fa580b7..c2fdfb3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -121,14 +121,17 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
   }
 
   private WagedRebalancer getWagedRebalancer(HelixManager helixManager,
-      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences,
+      boolean isAsyncGlobalRebalanceEnabled) {
     // Create WagedRebalancer instance if it hasn't been already initialized
     if (_wagedRebalancer == null) {
-      _wagedRebalancer = new WagedRebalancer(helixManager, preferences);
+      _wagedRebalancer =
+          new WagedRebalancer(helixManager, preferences, 
isAsyncGlobalRebalanceEnabled);
     } else {
-      // Since the preference can be updated at runtime, try to update the 
algorithm preference
-      // before returning the rebalancer.
-      _wagedRebalancer.updatePreference(preferences);
+      // Since the rebalance configuration can be updated at runtime, try to 
update the rebalancer
+      // before returning.
+      _wagedRebalancer.updateRebalancePreference(preferences);
+      
_wagedRebalancer.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
     }
     return _wagedRebalancer;
   }
@@ -281,8 +284,10 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
 
     Map<String, IdealState> newIdealStates = new HashMap<>();
 
+    ClusterConfig clusterConfig = cache.getClusterConfig();
     WagedRebalancer wagedRebalancer =
-        getWagedRebalancer(helixManager, 
cache.getClusterConfig().getGlobalRebalancePreference());
+        getWagedRebalancer(helixManager, 
clusterConfig.getGlobalRebalancePreference(),
+            clusterConfig.isGlobalRebalanceAsyncModeEnabled());
     try {
       newIdealStates.putAll(wagedRebalancer.computeNewIdealStates(cache, 
wagedRebalancedResourceMap,
           currentStateOutput));
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 74554e9..9f44a37 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -443,16 +443,26 @@ public class BestPossibleExternalViewVerifier extends 
ZkHelixClusterVerifier {
     }
 
     @Override
-    public void persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
+    public boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
+      // If baseline hasn't changed, skip writing to metadata store
+      if (compareAssignments(_globalBaseline, globalBaseline)) {
+        return false;
+      }
       // Update the in-memory reference only
       _globalBaseline = globalBaseline;
+      return true;
     }
 
     @Override
-    public void persistBestPossibleAssignment(
+    public boolean persistBestPossibleAssignment(
         Map<String, ResourceAssignment> bestPossibleAssignment) {
+      // If bestPossibleAssignment hasn't changed, skip writing to metadata 
store
+      if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) 
{
+        return false;
+      }
       // Update the in-memory reference only
       _bestPossibleAssignment = bestPossibleAssignment;
+      return true;
     }
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index 7d05416..72c72a8 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -41,17 +41,19 @@ public class MockAssignmentMetadataStore extends 
AssignmentMetadataStore {
     return _persistGlobalBaseline;
   }
 
-  public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+  public boolean persistBaseline(Map<String, ResourceAssignment> 
globalBaseline) {
     _persistGlobalBaseline = globalBaseline;
+    return true;
   }
 
   public Map<String, ResourceAssignment> getBestPossibleAssignment() {
     return _persistBestPossibleAssignment;
   }
 
-  public void persistBestPossibleAssignment(
+  public boolean persistBestPossibleAssignment(
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     _persistBestPossibleAssignment = bestPossibleAssignment;
+    return true;
   }
 
   public void close() {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
index f4c875f..db51fd5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestMixedModeAutoRebalance.java
@@ -48,7 +48,6 @@ import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -60,7 +59,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase {
 
   private final String CLASS_NAME = getShortClassName();
   private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
-  protected static final String DB_NAME = "Test-DB";
 
   private ClusterControllerManager _controller;
   private List<MockParticipantManager> _participants = new ArrayList<>();
@@ -111,13 +109,13 @@ public class TestMixedModeAutoRebalance extends 
ZkTestBase {
     };
   }
 
-  protected void createResource(String stateModel, int numPartition, int 
replica,
+  protected void createResource(String dbName, String stateModel, int 
numPartition, int replica,
       boolean delayEnabled, String rebalanceStrategy) {
     if (delayEnabled) {
-      createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, 
numPartition, replica,
+      createResourceWithDelayedRebalance(CLUSTER_NAME, dbName, stateModel, 
numPartition, replica,
           replica - 1, 200, rebalanceStrategy);
     } else {
-      createResourceWithDelayedRebalance(CLUSTER_NAME, DB_NAME, stateModel, 
numPartition, replica,
+      createResourceWithDelayedRebalance(CLUSTER_NAME, dbName, stateModel, 
numPartition, replica,
           replica, 0, rebalanceStrategy);
     }
   }
@@ -125,98 +123,111 @@ public class TestMixedModeAutoRebalance extends 
ZkTestBase {
   @Test(dataProvider = "stateModels")
   public void testUserDefinedPreferenceListsInFullAuto(String stateModel, 
boolean delayEnabled,
       String rebalanceStrateyName) throws Exception {
-    createResource(stateModel, _PARTITIONS, _replica, delayEnabled,
+    String dbName = "Test-DB-" + stateModel;
+    createResource(dbName, stateModel, _PARTITIONS, _replica, delayEnabled,
         rebalanceStrateyName);
-    IdealState idealState =
-        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
DB_NAME);
-    Map<String, List<String>> userDefinedPreferenceLists = 
idealState.getPreferenceLists();
-    List<String> userDefinedPartitions = new ArrayList<>();
-    for (String partition : userDefinedPreferenceLists.keySet()) {
-      List<String> preferenceList = new ArrayList<>();
-      for (int k = _replica; k >= 0; k--) {
-        String instance = _participants.get(k).getInstanceName();
-        preferenceList.add(instance);
+    try {
+      IdealState idealState =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
dbName);
+      Map<String, List<String>> userDefinedPreferenceLists = 
idealState.getPreferenceLists();
+      List<String> userDefinedPartitions = new ArrayList<>();
+      for (String partition : userDefinedPreferenceLists.keySet()) {
+        List<String> preferenceList = new ArrayList<>();
+        for (int k = _replica; k >= 0; k--) {
+          String instance = _participants.get(k).getInstanceName();
+          preferenceList.add(instance);
+        }
+        userDefinedPreferenceLists.put(partition, preferenceList);
+        userDefinedPartitions.add(partition);
       }
-      userDefinedPreferenceLists.put(partition, preferenceList);
-      userDefinedPartitions.add(partition);
-    }
-
-    ResourceConfig resourceConfig =
-        new 
ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
-    _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
 
-    // TODO remove this sleep after fix 
https://github.com/apache/helix/issues/526
-    Thread.sleep(500);
+      ResourceConfig resourceConfig =
+          new 
ResourceConfig.Builder(dbName).setPreferenceLists(userDefinedPreferenceLists).build();
+      _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
 
-    Assert.assertTrue(_clusterVerifier.verify(3000));
-    verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, 
userDefinedPartitions);
+      // TODO remove this sleep after fix 
https://github.com/apache/helix/issues/526
+      Thread.sleep(500);
 
-    while (userDefinedPartitions.size() > 0) {
-      IdealState originIS = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
-          DB_NAME);
-      Set<String> nonUserDefinedPartitions = new 
HashSet<>(originIS.getPartitionSet());
-      nonUserDefinedPartitions.removeAll(userDefinedPartitions);
-
-      removePartitionFromUserDefinedList(DB_NAME, userDefinedPartitions);
-      // TODO: Remove wait once we enable the BestPossibleExternalViewVerifier 
for the WAGED rebalancer.
-      Thread.sleep(1000);
       Assert.assertTrue(_clusterVerifier.verify(3000));
-      verifyUserDefinedPreferenceLists(DB_NAME, userDefinedPreferenceLists, 
userDefinedPartitions);
-      verifyNonUserDefinedAssignment(DB_NAME, originIS, 
nonUserDefinedPartitions);
+      verifyUserDefinedPreferenceLists(dbName, userDefinedPreferenceLists,
+          userDefinedPartitions);
+
+      while (userDefinedPartitions.size() > 0) {
+        IdealState originIS =
+            
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
dbName);
+        Set<String> nonUserDefinedPartitions = new 
HashSet<>(originIS.getPartitionSet());
+        nonUserDefinedPartitions.removeAll(userDefinedPartitions);
+
+        removePartitionFromUserDefinedList(dbName, userDefinedPartitions);
+        // TODO: Remove wait once we enable the 
BestPossibleExternalViewVerifier for the WAGED rebalancer.
+        Thread.sleep(1000);
+        Assert.assertTrue(_clusterVerifier.verify(3000));
+        verifyUserDefinedPreferenceLists(dbName, userDefinedPreferenceLists,
+            userDefinedPartitions);
+        verifyNonUserDefinedAssignment(dbName, originIS, 
nonUserDefinedPartitions);
+      }
+    } finally {
+      _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, 
dbName);
+      _clusterVerifier.verify(5000);
     }
   }
 
   @Test
   public void testUserDefinedPreferenceListsInFullAutoWithErrors() throws 
Exception {
-    createResource(BuiltInStateModelDefinitions.MasterSlave.name(), 5, 
_replica,
-        false, CrushRebalanceStrategy.class.getName());
-
-    IdealState idealState =
-        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
DB_NAME);
-    Map<String, List<String>> userDefinedPreferenceLists = 
idealState.getPreferenceLists();
-
-    List<String> newNodes = new ArrayList<>();
-    for (int i = NUM_NODE; i < NUM_NODE + _replica; i++) {
-      String instance = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
-
-      // start dummy participants
-      MockParticipantManager participant =
-          new TestMockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance);
-      participant.syncStart();
-      _participants.add(participant);
-      newNodes.add(instance);
-    }
-
-    List<String> userDefinedPartitions = new ArrayList<>();
-    for (String partition : userDefinedPreferenceLists.keySet()) {
-      userDefinedPreferenceLists.put(partition, newNodes);
-      userDefinedPartitions.add(partition);
-    }
+    String dbName = "Test-DB-withErrors";
+    createResource(dbName, BuiltInStateModelDefinitions.MasterSlave.name(), 5, 
_replica, false,
+        CrushRebalanceStrategy.class.getName());
+    try {
+      IdealState idealState =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
dbName);
+      Map<String, List<String>> userDefinedPreferenceLists = 
idealState.getPreferenceLists();
+
+      List<String> newNodes = new ArrayList<>();
+      for (int i = NUM_NODE; i < NUM_NODE + _replica; i++) {
+        String instance = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+        _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+
+        // start dummy participants
+        MockParticipantManager participant =
+            new TestMockParticipantManager(ZK_ADDR, CLUSTER_NAME, instance);
+        participant.syncStart();
+        _participants.add(participant);
+        newNodes.add(instance);
+      }
 
-    ResourceConfig resourceConfig =
-        new 
ResourceConfig.Builder(DB_NAME).setPreferenceLists(userDefinedPreferenceLists).build();
-    _configAccessor.setResourceConfig(CLUSTER_NAME, DB_NAME, resourceConfig);
+      List<String> userDefinedPartitions = new ArrayList<>();
+      for (String partition : userDefinedPreferenceLists.keySet()) {
+        userDefinedPreferenceLists.put(partition, newNodes);
+        userDefinedPartitions.add(partition);
+      }
 
-    TestHelper.verify(() -> {
-      ExternalView ev =
-          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
DB_NAME);
-      if (ev != null) {
-        for (String partition : ev.getPartitionSet()) {
-          Map<String, String> stateMap = ev.getStateMap(partition);
-          if (stateMap.values().contains("ERROR")) {
-            return true;
+      ResourceConfig resourceConfig =
+          new 
ResourceConfig.Builder(dbName).setPreferenceLists(userDefinedPreferenceLists).build();
+      _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
+
+      TestHelper.verify(() -> {
+        ExternalView ev =
+            
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
dbName);
+        if (ev != null) {
+          for (String partition : ev.getPartitionSet()) {
+            Map<String, String> stateMap = ev.getStateMap(partition);
+            if (stateMap.values().contains("ERROR")) {
+              return true;
+            }
           }
         }
-      }
-      return false;
-    }, 2000);
-
-    ExternalView ev =
-        
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
DB_NAME);
-    IdealState is = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
-        DB_NAME);
-    validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+        return false;
+      }, 2000);
+
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
dbName);
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
dbName);
+      validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE);
+    } finally {
+      _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, 
dbName);
+      _clusterVerifier.verify(5000);
+    }
   }
 
   private void verifyUserDefinedPreferenceLists(String db,
@@ -260,12 +271,6 @@ public class TestMixedModeAutoRebalance extends ZkTestBase 
{
     _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
   }
 
-  @AfterMethod
-  public void afterMethod() {
-    _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, DB_NAME);
-    _clusterVerifier.verify(5000);
-  }
-
   @AfterClass
   public void afterClass() throws Exception {
     /**
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
index eb9e0f8..c482e8f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestMixedModeWagedRebalance.java
@@ -39,20 +39,20 @@ public class TestMixedModeWagedRebalance extends 
TestMixedModeAutoRebalance {
     };
   }
 
-  protected void createResource(String stateModel, int numPartition,
-      int replica, boolean delayEnabled, String rebalanceStrategy) {
+  protected void createResource(String dbName, String stateModel, int 
numPartition, int replica,
+      boolean delayEnabled, String rebalanceStrategy) {
     if (delayEnabled) {
       setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 200);
-      createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, 
numPartition, replica,
+      createResourceWithWagedRebalance(CLUSTER_NAME, dbName, stateModel, 
numPartition, replica,
           replica - 1);
     } else {
-      createResourceWithWagedRebalance(CLUSTER_NAME, DB_NAME, stateModel, 
numPartition, replica, replica);
+      createResourceWithWagedRebalance(CLUSTER_NAME, dbName, stateModel, 
numPartition, replica,
+          replica);
     }
   }
 
   @AfterMethod
   public void afterMethod() {
-    super.afterMethod();
     setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1);
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
similarity index 92%
rename from 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
rename to 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
index 37e76ee..156c855 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
@@ -1,4 +1,4 @@
-package org.apache.helix.integration.rebalancer.PartitionMigration;
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -22,6 +22,7 @@ package 
org.apache.helix.integration.rebalancer.PartitionMigration;
 import java.util.HashMap;
 import java.util.Map;
 
+import 
org.apache.helix.integration.rebalancer.PartitionMigration.TestExpandCluster;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 

Reply via email to