This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new a901c20 Don't skip WAGED rebalancer calculation even the resource
list is empty. (#1271)
a901c20 is described below
commit a901c201a00d4b96294020ef82bc113fea196e8a
Author: Jiajun Wang <[email protected]>
AuthorDate: Fri Aug 14 16:47:57 2020 -0700
Don't skip WAGED rebalancer calculation even the resource list is empty.
(#1271)
This change is to allow the rebalancer logic cleaning up the internal
assignment state and cache if all the resources are removed from the cluster.
---
.../rebalancer/waged/WagedRebalancer.java | 22 -------
.../stages/BestPossibleStateCalcStage.java | 2 +-
.../rebalancer/waged/TestWagedRebalancer.java | 3 +
.../WagedRebalancer/TestWagedRebalance.java | 70 ++++++++++++++++++++--
4 files changed, 68 insertions(+), 29 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index e1f5d4a..89ecc47 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -253,16 +253,6 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
public Map<String, IdealState>
computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput
currentStateOutput)
throws HelixRebalanceException {
- if (resourceMap.isEmpty()) {
- LOG.debug(
- "There is no resource to be rebalanced by {}. Reset the persisted
assignment state if any.",
- this.getClass().getSimpleName());
- // Clean up the persisted assignment records so if the resources are
added back to WAGED, they
- // will be recalculated as a new one.
- clearAssignmentMetadata();
- return Collections.emptyMap();
- }
-
LOG.info("Start computing new ideal states for resources: {}",
resourceMap.keySet().toString());
validateInput(clusterData, resourceMap);
@@ -468,18 +458,6 @@ public class WagedRebalancer implements
StatefulRebalancer<ResourceControllerDat
}
}
- private void clearAssignmentMetadata() {
- if (_assignmentMetadataStore != null) {
- try {
- _writeLatency.startMeasuringLatency();
- _assignmentMetadataStore.clearAssignmentMetadata();
- _writeLatency.endMeasuringLatency();
- } catch (Exception ex) {
- LOG.error("Failed to clear the assignment metadata.", ex);
- }
- }
- }
-
/**
* Calculate and update the Baseline assignment
* @param clusterModel
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 32d3b42..bca7a37 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -292,7 +292,7 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
} else {
failureResources.add(resource.getResourceName());
LogUtil.logWarn(logger, _eventId, String
- .format("Failed to calculate best possible states for %s.",
+ .format("The calculated best possible states for %s is empty or
invalid.",
resource.getResourceName()));
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index fe5fb02..cfcbe29 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -150,6 +150,9 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
Assert.assertFalse(_metadataStore.getBaseline().isEmpty());
Assert.assertFalse(_metadataStore.getBestPossibleAssignment().isEmpty());
// Calculate with empty resource list. The rebalancer shall clean up all
the assignment status.
+ when(clusterData.getRefreshedChangeTypes())
+
.thenReturn(Collections.singleton(HelixConstants.ChangeType.IDEAL_STATE));
+ clusterData.getIdealStates().clear();
newIdealStates = rebalancer
.computeNewIdealStates(clusterData, Collections.emptyMap(), new
CurrentStateOutput());
Assert.assertTrue(newIdealStates.isEmpty());
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index c232462..b778b3c 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -36,9 +36,11 @@ import org.apache.helix.common.ZkTestBase;
import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
@@ -66,6 +68,7 @@ public class TestWagedRebalance extends ZkTestBase {
protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
protected ClusterControllerManager _controller;
+ protected AssignmentMetadataStore _assignmentMetadataStore;
List<MockParticipantManager> _participants = new ArrayList<>();
Map<String, String> _nodeToTagMap = new HashMap<>();
@@ -103,6 +106,23 @@ public class TestWagedRebalance extends ZkTestBase {
_controller.syncStart();
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+
+ // It's a hacky way to workaround the package restriction. Note that we
still want to hide the
+ // AssignmentMetadataStore constructor to prevent unexpected update to the
assignment records.
+ _assignmentMetadataStore =
+ new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR),
CLUSTER_NAME) {
+ public Map<String, ResourceAssignment> getBaseline() {
+ // Ensure this metadata store always read from the ZK without
using cache.
+ super.reset();
+ return super.getBaseline();
+ }
+
+ public synchronized Map<String, ResourceAssignment>
getBestPossibleAssignment() {
+ // Ensure this metadata store always read from the ZK without
using cache.
+ super.reset();
+ return super.getBestPossibleAssignment();
+ }
+ };
}
protected void addInstanceConfig(String storageNodeName, int seqNo, int
tagCount) {
@@ -118,7 +138,8 @@ public class TestWagedRebalance extends ZkTestBase {
int i = 0;
for (String stateModel : _testModels) {
String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
- createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica, _replica);
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -315,8 +336,7 @@ public class TestWagedRebalance extends ZkTestBase {
Thread.sleep(300);
validate(newReplicaFactor);
- ev =
-
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
dbName);
+ ev =
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
dbName);
Assert.assertEquals(ev.getPartitionSet().size(), PARTITIONS + 2);
}
@@ -412,7 +432,6 @@ public class TestWagedRebalance extends ZkTestBase {
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
_gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
-
}
int j = 0;
@@ -490,7 +509,8 @@ public class TestWagedRebalance extends ZkTestBase {
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
idealState.setMaxPartitionsPerInstance(1);
-
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db,
idealState);
+ _gSetupTool.getClusterManagementTool()
+ .setResourceIdealState(CLUSTER_NAME, db, idealState);
}
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
@@ -546,7 +566,8 @@ public class TestWagedRebalance extends ZkTestBase {
int i = 0;
for (String stateModel : _testModels) {
String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
- createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica, _replica);
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
_allDBs.add(db);
}
@@ -653,6 +674,36 @@ public class TestWagedRebalance extends ZkTestBase {
Assert.assertFalse(newEV.equals(oldEV));
}
+ @Test(dependsOnMethods = "test")
+ public void testRecreateSameResource() throws InterruptedException {
+ String dbName = "Test-DB-" + TestHelper.getTestMethodName();
+ createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica,
_replica);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+ _allDBs.add(dbName);
+ // waiting for the DBs being dropped.
+ Thread.sleep(300);
+ validate(_replica);
+
+ // Record the current Ideal State and Resource Config for recreating.
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
dbName);
+
+ // Drop preserved the DB.
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, dbName);
+ _allDBs.remove(dbName);
+ // waiting for the DBs being dropped.
+ Thread.sleep(100);
+ validate(_replica);
+
+ // Recreate the DB.
+ _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName,
is);
+ _allDBs.add(dbName);
+ // waiting for the DBs to be recreated.
+ Thread.sleep(100);
+ validate(_replica);
+ }
+
private void validate(int expectedReplica) {
HelixClusterVerifier _clusterVerifier =
new
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
@@ -665,6 +716,7 @@ public class TestWagedRebalance extends ZkTestBase {
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
validateIsolation(is, ev, expectedReplica);
}
+ _clusterVerifier.close();
}
/**
@@ -702,6 +754,12 @@ public class TestWagedRebalance extends ZkTestBase {
} finally {
_clusterVerifier.close();
}
+
+ // Verify the DBs are all removed and the persisted assignment records are
cleaned up.
+ Assert.assertEquals(
+
_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME).size(),
0);
+
Assert.assertTrue(_assignmentMetadataStore.getBestPossibleAssignment().isEmpty());
+ Assert.assertTrue(_assignmentMetadataStore.getBaseline().isEmpty());
}
@AfterClass