This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 1d47d6b51 [apache/helix] -- Add SetPartitionToError for participants
to self annotate a node to ERROR state (#2792)
1d47d6b51 is described below
commit 1d47d6b51ee27764008685cf6ec76d4108915807
Author: Charanya Sudharsanan <[email protected]>
AuthorDate: Tue May 7 21:44:40 2024 -0700
[apache/helix] -- Add SetPartitionToError for participants to self annotate
a node to ERROR state (#2792)
Co-authored-by: Charanya Sudharsanan <[email protected]>
What: An API endpoint that validates the incoming request and sends a state
transition message to sets one or more partitions from any current state to
ERROR state.
Why: Currently, the participants are unable to set a partition to an ERROR
state explicitly when they seem to be stuck in a specific current state. The
only way a replica can be set to ERROR is from within a state model. Having an
endpoint to allow this behavior would allow the clients to call the
resetPartition endpoint to set it back to INIT state and recover the replica.
resetPartition works only on partitions in error state.
---
.../src/main/java/org/apache/helix/HelixAdmin.java | 12 +
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 256 ++++++++++++---------
.../handling/HelixStateTransitionHandler.java | 3 +-
.../apache/helix/messaging/handling/HelixTask.java | 1 +
.../helix/participant/statemachine/StateModel.java | 11 +
.../java/org/apache/helix/tools/ClusterSetup.java | 21 ++
.../integration/TestSetPartitionsToErrorState.java | 99 ++++++++
.../apache/helix/manager/zk/TestZkHelixAdmin.java | 119 +++++++++-
.../java/org/apache/helix/mock/MockHelixAdmin.java | 6 +
.../rest/server/resources/AbstractResource.java | 3 +-
.../resources/helix/PerInstanceAccessor.java | 10 +
.../helix/rest/server/TestPerInstanceAccessor.java | 43 +++-
12 files changed, 458 insertions(+), 126 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index d2e0c2681..84a7154b1 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -421,6 +421,18 @@ public interface HelixAdmin {
*/
ClusterManagementMode getClusterManagementMode(String clusterName);
+ /**
+ * Set a list of partitions for an instance to ERROR state from any state.
+ * The partitions could be in any state and setPartitionsToError will bring
them to ERROR
+ * state. ANY to ERROR state transition is required for this.
+ * @param clusterName
+ * @param instanceName
+ * @param resourceName
+ * @param partitionNames
+ */
+ void setPartitionsToError(String clusterName, String instanceName, String
resourceName,
+ List<String> partitionNames);
+
/**
* Reset a list of partitions in error state for an instance
* The partitions are assume to be in error state and reset will bring them
from error
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index c7fe0861b..8c873b4cd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -1035,6 +1035,136 @@ public class ZKHelixAdmin implements HelixAdmin {
: new ClusterManagementMode(status.getManagementMode(),
status.getManagementModeStatus());
}
+ @Override
+ public void setPartitionsToError(String clusterName, String instanceName,
String resourceName,
+ List<String> partitionNames) {
+ logger.info("Set partitions {} for resource {} on instance {} in cluster
{} to ERROR state.",
+ partitionNames == null ? "NULL" :
HelixUtil.serializeByComma(partitionNames), resourceName,
+ instanceName, clusterName);
+ sendStateTransitionMessage(clusterName, instanceName, resourceName,
partitionNames,
+ StateTransitionType.SET_TO_ERROR);
+ }
+
+ private void sendStateTransitionMessage(String clusterName, String
instanceName,
+ String resourceName, List<String> partitionNames, StateTransitionType
stateTransitionType) {
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // check the instance is alive
+ LiveInstance liveInstance =
accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ if (liveInstance == null) {
+ // check if the instance exists in the cluster
+ String instanceConfigPath =
PropertyPathBuilder.instanceConfig(clusterName, instanceName);
+ throw new HelixException(String.format(
+ (_zkClient.exists(instanceConfigPath) ?
SetPartitionFailureReason.INSTANCE_NOT_ALIVE
+ :
SetPartitionFailureReason.INSTANCE_NON_EXISTENT).getMessage(resourceName,
+ partitionNames, instanceName, instanceName, clusterName,
stateTransitionType)));
+ }
+
+ // check resource exists in ideal state
+ IdealState idealState =
accessor.getProperty(keyBuilder.idealStates(resourceName));
+ if (idealState == null) {
+ throw new HelixException(
+
String.format(SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName,
+ partitionNames, instanceName, resourceName, clusterName,
stateTransitionType)));
+ }
+
+ // check partition exists in resource
+ Set<String> partitionsNames = new HashSet<String>(partitionNames);
+ Set<String> partitions = (idealState.getRebalanceMode() ==
RebalanceMode.CUSTOMIZED)
+ ? idealState.getRecord().getMapFields().keySet()
+ : idealState.getRecord().getListFields().keySet();
+ if (!partitions.containsAll(partitionsNames)) {
+ throw new HelixException(
+
String.format(SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName,
+ partitionNames, instanceName, partitionNames.toString(),
clusterName, stateTransitionType)));
+ }
+
+ // check partition is in ERROR state if reset is set to True
+ String sessionId = liveInstance.getEphemeralOwner();
+ CurrentState curState =
+ accessor.getProperty(keyBuilder.currentState(instanceName, sessionId,
resourceName));
+ if (stateTransitionType.equals(StateTransitionType.RESET)) {
+ for (String partitionName : partitionNames) {
+ if
(!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
+ throw new HelixException(String.format(
+
SetPartitionFailureReason.PARTITION_NOT_ERROR.getMessage(resourceName,
partitionNames,
+ instanceName, partitionNames.toString(), clusterName,
stateTransitionType)));
+ }
+ }
+ }
+
+ // check stateModelDef exists
+ String stateModelDef = idealState.getStateModelDefRef();
+ StateModelDefinition stateModel =
accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
+ if (stateModel == null) {
+ throw new HelixException(
+
String.format(SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName,
+ partitionNames, instanceName, stateModelDef, clusterName,
stateTransitionType)));
+ }
+
+ // check there is no pending messages for the partitions exist
+ List<Message> messages =
accessor.getChildValues(keyBuilder.messages(instanceName), true);
+ for (Message message : messages) {
+ if
(!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
+ || !sessionId.equals(message.getTgtSessionId())
+ || !resourceName.equals(message.getResourceName())
+ || !partitionsNames.contains(message.getPartitionName())) {
+ continue;
+ }
+
+ throw new HelixException(String.format(
+ "Can't %s state for %s.%s on %s, because a pending message %s exists
for resource %s",
+ stateTransitionType.name(), resourceName, partitionNames,
instanceName, message,
+ message.getResourceName()));
+ }
+
+ String adminName = null;
+ try {
+ adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
+ } catch (UnknownHostException e) {
+ logger.info("Unable to get host name. Will set it to UNKNOWN, mostly
ignorable", e);
+ adminName = "UNKNOWN";
+ }
+
+ List<Message> stateTransitionMessages = new ArrayList<Message>();
+ List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
+ for (String partitionName : partitionNames) {
+ String msgId = UUID.randomUUID().toString();
+ Message message = new Message(MessageType.STATE_TRANSITION, msgId);
+ message.setSrcName(adminName);
+ message.setTgtName(instanceName);
+ message.setMsgState(MessageState.NEW);
+ message.setPartitionName(partitionName);
+ message.setResourceName(resourceName);
+ message.setTgtSessionId(sessionId);
+ message.setStateModelDef(stateModelDef);
+ message.setStateModelFactoryName(idealState.getStateModelFactoryName());
+ // if reset == TRUE, send ERROR to initialState message
+ // else, send * to ERROR state message
+ if (stateTransitionType.equals(StateTransitionType.RESET)) {
+ message.setFromState(HelixDefinedState.ERROR.toString());
+ message.setToState(stateModel.getInitialState());
+ }
+ if (stateTransitionType.equals(StateTransitionType.SET_TO_ERROR)) {
+ message.setFromState("*");
+ message.setToState(HelixDefinedState.ERROR.toString());
+ }
+ if (idealState.getResourceGroupName() != null) {
+ message.setResourceGroupName(idealState.getResourceGroupName());
+ }
+ if (idealState.getInstanceGroupTag() != null) {
+ message.setResourceTag(idealState.getInstanceGroupTag());
+ }
+
+ stateTransitionMessages.add(message);
+ messageKeys.add(keyBuilder.message(instanceName, message.getId()));
+ }
+
+ accessor.setChildren(messageKeys, stateTransitionMessages);
+ }
+
private void enableClusterPauseMode(String clusterName, boolean
cancelPendingST, String reason) {
String hostname = NetworkUtil.getLocalhostName();
logger.info(
@@ -1180,7 +1310,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
}
- private enum ResetPartitionFailureReason {
+ private enum SetPartitionFailureReason {
INSTANCE_NOT_ALIVE("%s is not alive in cluster %s"),
INSTANCE_NON_EXISTENT("%s does not exist in cluster %s"),
RESOURCE_NON_EXISTENT("resource %s is not added to cluster %s"),
@@ -1190,129 +1320,33 @@ public class ZKHelixAdmin implements HelixAdmin {
private String message;
- ResetPartitionFailureReason(String message) {
+ SetPartitionFailureReason(String message) {
this.message = message;
}
public String getMessage(String resourceName, List<String> partitionNames,
String instanceName,
- String errorStateEntity, String clusterName) {
- return String.format("Can't reset state for %s.%s on %s, because " +
message, resourceName,
- partitionNames, instanceName, errorStateEntity, clusterName);
+ String errorStateEntity, String clusterName, StateTransitionType
stateTransitionType) {
+ return String.format("Can't %s state for %s.%s on %s, because " +
message,
+ stateTransitionType.name(), resourceName, partitionNames,
instanceName, errorStateEntity,
+ clusterName);
}
}
+ private enum StateTransitionType {
+ // sets state from ERROR to INIT.
+ RESET,
+ // sets state from ANY to ERROR.
+ SET_TO_ERROR,
+ // Unknown StateTransitionType
+ UNDEFINED
+ }
@Override
public void resetPartition(String clusterName, String instanceName, String
resourceName,
List<String> partitionNames) {
logger.info("Reset partitions {} for resource {} on instance {} in cluster
{}.",
partitionNames == null ? "NULL" :
HelixUtil.serializeByComma(partitionNames), resourceName,
instanceName, clusterName);
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(_zkClient));
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
- // check the instance is alive
- LiveInstance liveInstance =
accessor.getProperty(keyBuilder.liveInstance(instanceName));
- if (liveInstance == null) {
- // check if the instance exists in the cluster
- String instanceConfigPath =
PropertyPathBuilder.instanceConfig(clusterName, instanceName);
- throw new HelixException(String.format(
- (_zkClient.exists(instanceConfigPath) ?
ResetPartitionFailureReason.INSTANCE_NOT_ALIVE
- : ResetPartitionFailureReason.INSTANCE_NON_EXISTENT)
- .getMessage(resourceName, partitionNames, instanceName,
instanceName, clusterName)));
- }
-
- // check resource group exists
- IdealState idealState =
accessor.getProperty(keyBuilder.idealStates(resourceName));
- if (idealState == null) {
- throw new
HelixException(String.format(ResetPartitionFailureReason.RESOURCE_NON_EXISTENT
- .getMessage(resourceName, partitionNames, instanceName,
resourceName, clusterName)));
- }
-
- // check partition exists in resource group
- Set<String> resetPartitionNames = new HashSet<String>(partitionNames);
- Set<String> partitions =
- (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) ?
idealState.getRecord()
- .getMapFields().keySet() :
idealState.getRecord().getListFields().keySet();
- if (!partitions.containsAll(resetPartitionNames)) {
- throw new
HelixException(String.format(ResetPartitionFailureReason.PARTITION_NON_EXISTENT
- .getMessage(resourceName, partitionNames, instanceName,
partitionNames.toString(),
- clusterName)));
- }
-
- // check partition is in ERROR state
- String sessionId = liveInstance.getEphemeralOwner();
- CurrentState curState =
- accessor.getProperty(keyBuilder.currentState(instanceName, sessionId,
resourceName));
- for (String partitionName : resetPartitionNames) {
- if
(!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
- throw new
HelixException(String.format(ResetPartitionFailureReason.PARTITION_NOT_ERROR
- .getMessage(resourceName, partitionNames, instanceName,
partitionNames.toString(),
- clusterName)));
- }
- }
-
- // check stateModelDef exists and get initial state
- String stateModelDef = idealState.getStateModelDefRef();
- StateModelDefinition stateModel =
accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
- if (stateModel == null) {
- throw new
HelixException(String.format(ResetPartitionFailureReason.STATE_MODEL_NON_EXISTENT
- .getMessage(resourceName, partitionNames, instanceName,
stateModelDef, clusterName)));
- }
-
- // check there is no pending messages for the partitions exist
- List<Message> messages =
accessor.getChildValues(keyBuilder.messages(instanceName), true);
- for (Message message : messages) {
- if
(!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) ||
!sessionId
- .equals(message.getTgtSessionId()) ||
!resourceName.equals(message.getResourceName())
- || !resetPartitionNames.contains(message.getPartitionName())) {
- continue;
- }
-
- throw new HelixException(String.format(
- "Can't reset state for %s.%s on %s, because a pending message %s
exists for resource %s",
- resourceName, partitionNames, instanceName, message.toString(),
- message.getResourceName()));
- }
-
- String adminName = null;
- try {
- adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
- } catch (UnknownHostException e) {
- // can ignore it
- logger.info("Unable to get host name. Will set it to UNKNOWN, mostly
ignorable", e);
- adminName = "UNKNOWN";
- }
-
- List<Message> resetMessages = new ArrayList<Message>();
- List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
- for (String partitionName : resetPartitionNames) {
- // send ERROR to initialState message
- String msgId = UUID.randomUUID().toString();
- Message message = new Message(MessageType.STATE_TRANSITION, msgId);
- message.setSrcName(adminName);
- message.setTgtName(instanceName);
- message.setMsgState(MessageState.NEW);
- message.setPartitionName(partitionName);
- message.setResourceName(resourceName);
- message.setTgtSessionId(sessionId);
- message.setStateModelDef(stateModelDef);
- message.setFromState(HelixDefinedState.ERROR.toString());
- message.setToState(stateModel.getInitialState());
- message.setStateModelFactoryName(idealState.getStateModelFactoryName());
-
- if (idealState.getResourceGroupName() != null) {
- message.setResourceGroupName(idealState.getResourceGroupName());
- }
- if (idealState.getInstanceGroupTag() != null) {
- message.setResourceTag(idealState.getInstanceGroupTag());
- }
-
- resetMessages.add(message);
- messageKeys.add(keyBuilder.message(instanceName, message.getId()));
- }
-
- accessor.setChildren(messageKeys, resetMessages);
+ sendStateTransitionMessage(clusterName, instanceName, resourceName,
partitionNames, StateTransitionType.RESET);
}
@Override
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 0d67ced4b..0a91370b0 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -176,7 +176,8 @@ public class HelixStateTransitionHandler extends
MessageHandler {
deltaList.add(delta);
_currentStateDelta.setDeltaList(deltaList);
_stateModelFactory.removeStateModel(_message.getResourceName(),
partitionKey);
- } else if
(_stateModel.getCurrentState().equals(_message.getFromState())) {
+ } else if (_message.getFromState().equals("*")
+ || _stateModel.getCurrentState().equals(_message.getFromState())) {
// if the partition is not to be dropped, update _stateModel to the
TO_STATE
// need this check because TaskRunner may change _stateModel before
reach here.
_stateModel.updateState(toState);
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 4470b9996..6a9473eba 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -323,6 +323,7 @@ public class HelixTask implements MessageTask {
String fromState = message.getFromState();
String toState = message.getToState();
String transition = fromState + "--" + toState;
+ transition = transition.replaceAll("\\*", "ANY");
StateTransitionContext cxt =
new StateTransitionContext(manager.getClusterName(),
manager.getInstanceName(),
diff --git
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
index 143c14ade..5bb2a19c8 100644
---
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
+++
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
@@ -115,4 +115,15 @@ public abstract class StateModel {
public boolean isCancelled() {
return _cancelled;
}
+
+ /*
+ * default transition to set partition in any state to error state
+ * @param message
+ * @param context
+ * @throws InterruptedException
+ */
+ @Transition(to = "ERROR", from = "*")
+ public void onBecomeErrorFromAny(Message message, NotificationContext
context) throws Exception {
+ logger.info("Default *->ERROR transition invoked.");
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 633ce0341..a9578632b 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -134,6 +134,9 @@ public class ClusterSetup {
public static final String resetInstance = "resetInstance";
public static final String resetResource = "resetResource";
+ // set partitions to ERROR
+ public static final String setPartitionsToError = "setPartitionsToError";
+
// help
public static final String help = "help";
@@ -1114,6 +1117,13 @@ public class ClusterSetup {
removeCloudConfigOption.setRequired(false);
removeCloudConfigOption.setArgName("clusterName");
+ Option setPartitionsToErrorOption =
+ OptionBuilder.withLongOpt(setPartitionsToError)
+ .withDescription("Set a Partition to Error State").create();
+ setPartitionsToErrorOption.setArgs(4);
+ setPartitionsToErrorOption.setRequired(false);
+ setPartitionsToErrorOption.setArgName("clusterName instanceName
resourceName partitionName");
+
OptionGroup group = new OptionGroup();
group.setRequired(true);
group.addOption(rebalanceOption);
@@ -1153,6 +1163,7 @@ public class ClusterSetup {
group.addOption(listStateModelOption);
group.addOption(addResourcePropertyOption);
group.addOption(removeResourcePropertyOption);
+ group.addOption(setPartitionsToErrorOption);
// set/get/remove config options
group.addOption(setConfOption);
@@ -1561,6 +1572,16 @@ public class ClusterSetup {
String newInstanceName = cmd.getOptionValues(swapInstance)[2];
setupTool.swapInstance(clusterName, oldInstanceName, newInstanceName);
+ } else if (cmd.hasOption(setPartitionsToError)) {
+ String[] args = cmd.getOptionValues(setPartitionsToError);
+
+ String clusterName = args[0];
+ String instanceName = args[1];
+ String resourceName = args[2];
+ List<String> partitionNames = Arrays.asList(Arrays.copyOfRange(args, 3,
args.length));
+
+ setupTool.getClusterManagementTool().setPartitionsToError(clusterName,
instanceName, resourceName, partitionNames);
+ return 0;
}
// set/get/remove config options
else if (cmd.hasOption(setConfig)) {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java
b/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java
new file mode 100644
index 000000000..5b13703b6
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestSetPartitionsToErrorState.java
@@ -0,0 +1,99 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSetPartitionsToErrorState extends ZkTestBase {
+
+ @Test()
+ public void testSetPartitionsToErrorState() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ n, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+ controller.syncStart();
+
+ // start mock participants
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName,
instanceName);
+ participants[i].syncStart();
+ }
+
+ // verify cluster
+ HashMap<String, Map<String, String>> errStateMap = new HashMap<>();
+ errStateMap.put("TestDB0", new HashMap<>());
+ boolean result = ClusterStateVerifier.verifyByZkCallback(
+ (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName, errStateMap)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // set a non exist partition to ERROR, should throw exception
+ try {
+ String command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " +
clusterName
+ + " localhost_12918 TestDB0 TestDB0_nonExist";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ Assert.fail("Should throw exception on setting a non-exist partition to
error");
+ } catch (Exception e) {
+ // OK
+ }
+
+ // set one partition not in ERROR state to ERROR
+ String command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " +
clusterName
+ + " localhost_12918 TestDB0 TestDB0_4";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
+ result = ClusterStateVerifier.verifyByZkCallback(
+ (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName, errStateMap)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // set another partition not in ERROR state to ERROR
+ command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName
+ + " localhost_12918 TestDB0 TestDB0_7";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ errStateMap.get("TestDB0").put("TestDB0_7", "localhost_12918");
+ result = ClusterStateVerifier.verifyByZkCallback(
+ (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName, errStateMap)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // setting a partition already in ERROR state to ERROR - message does not
get processed
+ command = "--zkSvr " + ZK_ADDR + " --setPartitionsToError " + clusterName
+ + " localhost_12918 TestDB0 TestDB0_7";
+ ClusterSetup.processCommandLineArgs(command.split("\\s+"));
+ result = ClusterStateVerifier.verifyByZkCallback(
+ (new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName, errStateMap)));
+ Assert.assertTrue(result, "Cluster verification fails");
+
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < 5; i++) {
+ participants[i].syncStop();
+ }
+ deleteCluster(clusterName);
+
+ System.out.println("END " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index 59decd98e..558110857 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -589,6 +589,117 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
2);
}
+ @Test(description = "Unit test for sanity check in setPartitionsToError()")
+ public void testSetPartitionsToError() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ String instanceName = "TestInstance";
+ String testResource = "TestResource";
+ String wrongTestInstance = "WrongTestInstance";
+ String wrongTestResource = "WrongTestResource";
+ System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ admin.addInstance(clusterName, new InstanceConfig(instanceName));
+ admin.enableInstance(clusterName, instanceName, true);
+ InstanceConfig instanceConfig = admin.getInstanceConfig(clusterName,
instanceName);
+
+ IdealState idealState = new IdealState(testResource);
+ idealState.setNumPartitions(3);
+ admin.addStateModelDef(clusterName, "MasterSlave", new MasterSlaveSMD());
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ admin.addResource(clusterName, testResource, idealState);
+ admin.enableResource(clusterName, testResource, true);
+
+ /*
+ * This is a unit test for sanity check in setPartitionsToError().
+ * There is no running controller in this test. We have end-to-end tests
for
+ * setPartitionsToError()
+ * under integration/TestSetPartitionsToError.
+ */
+ // setPartitionsToError is expected to throw an exception when provided
with a nonexistent
+ // instance.
+ try {
+ admin.setPartitionsToError(clusterName, wrongTestInstance, testResource,
+ Arrays.asList("1", "2"));
+ Assert.fail("Should throw HelixException");
+ } catch (HelixException expected) {
+ // This exception is expected because the instance name is made up.
+ Assert.assertEquals(expected.getMessage(), String.format(
+ "Can't SET_TO_ERROR state for %s.[1, 2] on WrongTestInstance,
because %s does not exist in cluster %s",
+ testResource, wrongTestInstance, clusterName));
+ }
+
+ // setPartitionsToError is expected to throw an exception when provided
with a non-live
+ // instance.
+ try {
+ admin.setPartitionsToError(clusterName, instanceName, testResource,
Arrays.asList("1", "2"));
+ Assert.fail("Should throw HelixException");
+ } catch (HelixException expected) {
+ // This exception is expected because the instance is not alive.
+ Assert.assertEquals(expected.getMessage(),
+ String.format(
+ "Can't SET_TO_ERROR state for %s.[1, 2] on %s, because %s is not
alive in cluster %s",
+ testResource, instanceName, instanceName, clusterName));
+ }
+
+ HelixManager manager = initializeHelixManager(clusterName,
instanceConfig.getInstanceName());
+ manager.connect();
+
+ // setPartitionsToError is expected to throw an exception when provided
with a nonexistent
+ // resource.
+ try {
+ admin.setPartitionsToError(clusterName, instanceName, wrongTestResource,
+ Arrays.asList("1", "2"));
+ Assert.fail("Should throw HelixException");
+ } catch (HelixException expected) {
+ // This exception is expected because the resource is not added.
+ Assert.assertEquals(expected.getMessage(), String.format(
+ "Can't SET_TO_ERROR state for %s.[1, 2] on %s, because resource %s
is not added to cluster %s",
+ wrongTestResource, instanceName, wrongTestResource, clusterName));
+ }
+
+ // setPartitionsToError is expected to throw an exception when partition
does not exist.
+ try {
+ admin.setPartitionsToError(clusterName, instanceName, testResource,
Arrays.asList("1", "2"));
+ Assert.fail("Should throw HelixException");
+ } catch (HelixException expected) {
+ // This exception is expected because partitions do not exist.
+ Assert.assertEquals(expected.getMessage(), String.format(
+ "Can't SET_TO_ERROR state for %s.[1, 2] on %s, because not all [1,
2] exist in cluster %s",
+ testResource, instanceName, clusterName));
+ }
+
+ // clean up
+ manager.disconnect();
+ admin.dropCluster(clusterName);
+
+ // verify the cluster has been removed successfully
+ HelixDataAccessor dataAccessor =
+ new ZKHelixDataAccessor(className, new
ZkBaseDataAccessor<>(_gZkClient));
+ try {
+ Assert.assertTrue(TestHelper.verify(
+ () ->
dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances()).isEmpty(),
+ 1000));
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("There're live instances not cleaned up yet");
+ assert false;
+ }
+
+ try {
+ Assert.assertTrue(TestHelper.verify(
+ () ->
dataAccessor.getChildNames(dataAccessor.keyBuilder().clusterConfig()).isEmpty(),
+ 1000));
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("The cluster is not cleaned up yet");
+ assert false;
+ }
+ }
+
@Test
public void testResetPartition() throws Exception {
String className = TestHelper.getTestClassName();
@@ -625,7 +736,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
} catch (HelixException expected) {
// This exception is expected because the instance name is made up.
Assert.assertEquals(expected.getMessage(), String.format(
- "Can't reset state for %s.[1, 2] on WrongTestInstance, because %s
does not exist in cluster %s",
+ "Can't RESET state for %s.[1, 2] on WrongTestInstance, because %s
does not exist in cluster %s",
testResource, wrongTestInstance, clusterName));
}
@@ -636,7 +747,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
} catch (HelixException expected) {
// This exception is expected because the instance is not alive.
Assert.assertEquals(expected.getMessage(), String
- .format("Can't reset state for %s.[1, 2] on %s, because %s is not
alive in cluster %s",
+ .format("Can't RESET state for %s.[1, 2] on %s, because %s is not
alive in cluster %s",
testResource, instanceName, instanceName, clusterName));
}
@@ -650,7 +761,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
} catch (HelixException expected) {
// This exception is expected because the resource is not added.
Assert.assertEquals(expected.getMessage(), String.format(
- "Can't reset state for %s.[1, 2] on %s, because resource %s is not
added to cluster %s",
+ "Can't RESET state for %s.[1, 2] on %s, because resource %s is not
added to cluster %s",
wrongTestResource, instanceName, wrongTestResource, clusterName));
}
@@ -660,7 +771,7 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
} catch (HelixException expected) {
// This exception is expected because partitions do not exist.
Assert.assertEquals(expected.getMessage(), String.format(
- "Can't reset state for %s.[1, 2] on %s, because not all [1, 2] exist
in cluster %s",
+ "Can't RESET state for %s.[1, 2] on %s, because not all [1, 2] exist
in cluster %s",
testResource, instanceName, clusterName));
}
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 9a1311b1c..d9bc5d7fe 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -364,6 +364,12 @@ public class MockHelixAdmin implements HelixAdmin {
return null;
}
+ @Override
+ public void setPartitionsToError(String clusterName, String instanceName,
String resourceName,
+ List<String> partitionNames) {
+
+ }
+
@Override public void resetPartition(String clusterName, String
instanceName, String resourceName,
List<String> partitionNames) {
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index ce3d27273..fdad634af 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -89,7 +89,8 @@ public class AbstractResource {
canCompleteSwap,
completeSwapIfPossible,
onDemandRebalance,
- isEvacuateFinished
+ isEvacuateFinished,
+ setPartitionsToError
}
@Context
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
index efeeee7f7..ea98f6637 100644
---
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
+++
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java
@@ -434,6 +434,16 @@ public class PerInstanceAccessor extends
AbstractHelixResource {
OBJECT_MAPPER.getTypeFactory()
.constructCollectionType(List.class, String.class)));
break;
+ case setPartitionsToError:
+ if (!validInstance(node, instanceName)) {
+ return badRequest("Instance names are not a match!");
+ }
+ admin.setPartitionsToError(clusterId, instanceName,
+ node.get(PerInstanceProperties.resource.name()).textValue(),
+ (List<String>) OBJECT_MAPPER.readValue(
+
node.get(PerInstanceProperties.partitions.name()).toString(), OBJECT_MAPPER
+ .getTypeFactory().constructCollectionType(List.class,
String.class)));
+ break;
case setInstanceOperation:
admin.setInstanceOperation(clusterId, instanceName, state);
break;
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index 943444cad..395f9bf85 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -37,11 +37,13 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
@@ -377,7 +379,7 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
}
@Test(dependsOnMethods = "testDeleteInstance")
- public void updateInstance() throws IOException {
+ public void updateInstance() throws Exception {
System.out.println("Start test :" + TestHelper.getTestMethodName());
// Disable instance
Entity entity = Entity.entity("", MediaType.APPLICATION_JSON_TYPE);
@@ -461,11 +463,11 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
String dbName = "_db_0_";
List<String> partitionsToDisable = Arrays.asList(CLUSTER_NAME + dbName +
"0",
CLUSTER_NAME + dbName + "1", CLUSTER_NAME + dbName + "3");
+ String RESOURCE_NAME = CLUSTER_NAME + dbName.substring(0, dbName.length()
- 1);
entity = Entity.entity(
OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(),
- INSTANCE_NAME,
PerInstanceAccessor.PerInstanceProperties.resource.name(),
- CLUSTER_NAME + dbName.substring(0, dbName.length() - 1),
+ INSTANCE_NAME,
PerInstanceAccessor.PerInstanceProperties.resource.name(), RESOURCE_NAME,
PerInstanceAccessor.PerInstanceProperties.partitions.name(),
partitionsToDisable)),
MediaType.APPLICATION_JSON_TYPE);
@@ -474,13 +476,11 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
InstanceConfig instanceConfig =
_configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
Assert.assertEquals(
- new HashSet<>(instanceConfig.getDisabledPartitionsMap()
- .get(CLUSTER_NAME + dbName.substring(0, dbName.length() - 1))),
+ new
HashSet<>(instanceConfig.getDisabledPartitionsMap().get(RESOURCE_NAME)),
new HashSet<>(partitionsToDisable));
entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(ImmutableMap
.of(AbstractResource.Properties.id.name(), INSTANCE_NAME,
- PerInstanceAccessor.PerInstanceProperties.resource.name(),
- CLUSTER_NAME + dbName.substring(0, dbName.length() - 1),
+ PerInstanceAccessor.PerInstanceProperties.resource.name(),
RESOURCE_NAME,
PerInstanceAccessor.PerInstanceProperties.partitions.name(),
ImmutableList.of(CLUSTER_NAME + dbName + "1"))),
MediaType.APPLICATION_JSON_TYPE);
@@ -488,8 +488,7 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME,
INSTANCE_NAME);
- Assert.assertEquals(new HashSet<>(instanceConfig.getDisabledPartitionsMap()
- .get(CLUSTER_NAME + dbName.substring(0, dbName.length() - 1))),
+ Assert.assertEquals(new
HashSet<>(instanceConfig.getDisabledPartitionsMap().get(RESOURCE_NAME)),
new HashSet<>(Arrays.asList(CLUSTER_NAME + dbName + "0", CLUSTER_NAME
+ dbName + "3")));
// test set instance operation
@@ -595,6 +594,32 @@ public class TestPerInstanceAccessor extends
AbstractTestClass {
evacuateFinishedResult =
OBJECT_MAPPER.readValue(response.readEntity(String.class), Map.class);
Assert.assertEquals(response.getStatus(),
Response.Status.OK.getStatusCode());
Assert.assertTrue(evacuateFinishedResult.get("successful"));
+
+ // test setPartitionsToError
+ List<String> partitionsToSetToError = Arrays.asList(CLUSTER_NAME + dbName
+ "7");
+
+ entity = Entity.entity(
+
OBJECT_MAPPER.writeValueAsString(ImmutableMap.of(AbstractResource.Properties.id.name(),
+ INSTANCE_NAME,
PerInstanceAccessor.PerInstanceProperties.resource.name(), RESOURCE_NAME,
+ PerInstanceAccessor.PerInstanceProperties.partitions.name(),
partitionsToSetToError)),
+ MediaType.APPLICATION_JSON_TYPE);
+
+ response = new
JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setPartitionsToError")
+ .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
+
+ Assert.assertEquals(response.getStatus(),
Response.Status.OK.getStatusCode());
+
+ TestHelper.verify(() -> {
+ ExternalView externalView = _gSetupTool.getClusterManagementTool()
+ .getResourceExternalView(CLUSTER_NAME, RESOURCE_NAME);
+ Set responseForAllPartitions = new HashSet();
+ for (String partition : partitionsToSetToError) {
+ responseForAllPartitions.add(externalView.getStateMap(partition)
+ .get(INSTANCE_NAME) == HelixDefinedState.ERROR.toString());
+ }
+ return !responseForAllPartitions.contains(Boolean.FALSE);
+ }, TestHelper.WAIT_DURATION);
+
System.out.println("End test :" + TestHelper.getTestMethodName());
}