This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 3a7dbb301 Only check FULL_AUTO resources when determining if the
evacuation is complete (#2743)
3a7dbb301 is described below
commit 3a7dbb301973ded817acfccd7cd116356259fe2c
Author: Zachary Pinto <[email protected]>
AuthorDate: Thu Jan 25 20:36:51 2024 -0800
Only check FULL_AUTO resources when determining if the evacuation is
complete (#2743)
Evacuation is not supported for SEMI_AUTO resources. We are fixing the API
to only consider the CURRENTSTATE of FULL_AUTO resources in the check to see if
evacuation has is finished.
---
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 15 +++++---
.../rebalancer/TestInstanceOperation.java | 13 +++++++
.../helix/rest/server/TestPerInstanceAccessor.java | 41 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 4 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 8a8d13b7c..aa94a0244 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -576,7 +576,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public boolean isEvacuateFinished(String clusterName, String instanceName) {
- if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) {
+ if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
return config != null &&
config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name());
}
@@ -838,7 +838,7 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String
instanceName) {
- if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) {
+ if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
return config != null &&
DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
config.getInstanceOperation());
@@ -853,7 +853,8 @@ public class ZKHelixAdmin implements HelixAdmin {
* @param instanceName
* @return
*/
- private boolean instanceHasCurrentStateOrMessage(String clusterName, String
instanceName) {
+ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName,
+ String instanceName) {
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -892,7 +893,13 @@ public class ZKHelixAdmin implements HelixAdmin {
return true;
}
- return !currentStates.isEmpty();
+ // Get set of FULL_AUTO resources
+ List<IdealState> idealStates =
accessor.getChildValues(keyBuilder.idealStates(), true);
+ Set<String> fullAutoResources = idealStates != null ? idealStates.stream()
+ .filter(idealState -> idealState.getRebalanceMode() ==
RebalanceMode.FULL_AUTO)
+ .map(IdealState::getResourceName).collect(Collectors.toSet()) :
Collections.emptySet();
+
+ return currentStates.stream().anyMatch(fullAutoResources::contains);
}
@Override
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index bf6db2900..f42796fc5 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -34,6 +34,7 @@ import
org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
@@ -223,6 +224,14 @@ public class TestInstanceOperation extends ZkTestBase {
@Test
public void testEvacuate() throws Exception {
System.out.println("START TestInstanceOperation.testEvacuate() at " + new
Date(System.currentTimeMillis()));
+
+ // Add semi-auto DBs
+ String semiAutoDB = "SemiAutoTestDB_1";
+ createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, semiAutoDB,
+
_participants.stream().map(ZKHelixManager::getInstanceName).collect(Collectors.toList()),
+ BuiltInStateModelDefinitions.OnlineOffline.name(), 1,
_participants.size());
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
// EV should contain all participants, check resources one by one
Map<String, ExternalView> assignment = getEVs();
for (String resource : _allDBs) {
@@ -249,6 +258,10 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME,
instanceToEvacuate));
Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME,
instanceToEvacuate));
+
+ // Drop semi-auto DBs
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, semiAutoDB);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
@Test(dependsOnMethods = "testEvacuate")
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index a1f46cce9..48343f8d0 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -36,18 +36,23 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.rest.server.resources.AbstractResource;
import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
import org.apache.helix.rest.server.resources.helix.PerInstanceAccessor;
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -522,6 +527,29 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
Assert.assertFalse((boolean) responseMap.get("successful"));
// test isEvacuateFinished on instance with EVACUATE but has currentState
+ // Enable persist best possible assignment for cluster verifier
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ ClusterConfig clusterConfig =
configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setPersistBestPossibleAssignment(true);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ Set<String> resources = _resourcesMap.get(CLUSTER_NAME);
+ ZkHelixClusterVerifier clusterVerifier = new
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setDeactivatedNodeAwareness(true)
+ .setResources(resources)
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+ .build();
+ // Make the DBs FULL_AUTO and wait because EVACUATE is only supported for
FULL_AUTO resources
+ for (String resource : resources) {
+ IdealState idealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
resource);
+ idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ idealState.setDelayRebalanceEnabled(true);
+ idealState.setRebalanceDelay(360000);
+
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
resource, idealState);
+ }
+
+ Assert.assertTrue(clusterVerifier.verifyByPolling());
+
new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME,
INSTANCE_NAME);
@@ -534,6 +562,19 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
Assert.assertEquals(response.getStatus(),
Response.Status.OK.getStatusCode());
Assert.assertFalse(evacuateFinishedresult.get("successful"));
+ // Make all resources SEMI_AUTO again
+ for (String resource : resources) {
+ IdealState idealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
resource);
+ idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
+ idealState.setDelayRebalanceEnabled(false);
+ idealState.setRebalanceDelay(0);
+
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
resource, idealState);
+ }
+
+ // Wait for the cluster to be stable again
+ Assert.assertTrue(clusterVerifier.verifyByPolling());
+
// test isEvacuateFinished on instance with EVACUATE and no currentState
// Create new instance so no currentState or messages assigned to it
String test_instance_name = INSTANCE_NAME + "_foo";