This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit eb62d1511bec820cb0b18693303bb3b218d2160c Author: xyuanlu <[email protected]> AuthorDate: Wed Aug 23 10:34:21 2023 -0700 Add support for node evacuation. (#2604) --- .../rebalancer/util/DelayedRebalanceUtil.java | 27 +- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 2 - .../org/apache/helix/model/InstanceConfig.java | 11 +- .../rebalancer/TestInstanceOperation.java | 421 +++++++++++++++++++++ 4 files changed, 444 insertions(+), 17 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java index 92556bb40..e7ff99765 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java @@ -29,6 +29,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.helix.HelixManager; +import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; @@ -82,37 +83,37 @@ public class DelayedRebalanceUtil { } /** - * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster - * delay rebalance configurations. + * @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes) + * while considering cluster delay rebalance configurations. */ public static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) { if (!isDelayRebalanceEnabled(clusterConfig)) { - return new HashSet<>(liveEnabledNodes); + return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes); } return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes, instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig); } /** - * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster - * and the resource delay rebalance configurations. + * @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes) + * while considering cluster delay rebalance configurations. */ public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealState, Set<String> liveEnabledNodes, Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) { if (!isDelayRebalanceEnabled(idealState, clusterConfig)) { - return new HashSet<>(liveEnabledNodes); + return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes); } return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes, instanceConfigMap, delay, clusterConfig); } private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes, - Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, - Map<String, InstanceConfig> instanceConfigMap, long delay, ClusterConfig clusterConfig) { - Set<String> activeNodes = new HashSet<>(liveEnabledNodes); + Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap, + long delay, ClusterConfig clusterConfig) { + Set<String> activeNodes = filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes); Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes); offlineOrDisabledInstances.removeAll(liveEnabledNodes); long currentTime = System.currentTimeMillis(); @@ -128,6 +129,14 @@ public class DelayedRebalanceUtil { return activeNodes; } + private static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap, + Set<String> nodes) { + return nodes.stream() + .filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals( + InstanceConstants.InstanceOperation.EVACUATE.name())) + .collect(Collectors.toSet()); + } + /** * Return the time when an offline or disabled instance should be treated as inactive. Return -1 * if it is inactive now or forced to be rebalanced by an on-demand rebalance. diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 917cb25de..15b38fcbc 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -396,8 +396,6 @@ public class ZKHelixAdmin implements HelixAdmin { } InstanceConfig config = new InstanceConfig(currentData); - // TODO: add sanity check in config.setInstanceOperation and throw exception when needed. - // TODO: Also instance enabled in instance config config.setInstanceOperation(instanceOperation); return config.getRecord(); } diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index fdcb74517..193019d0e 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -345,11 +345,11 @@ public class InstanceConfig extends HelixProperty { public void setInstanceOperation(InstanceConstants.InstanceOperation operation) { if (operation != InstanceConstants.InstanceOperation.DISABLE - && operation != InstanceConstants.InstanceOperation.ENABLE ){ - if( !getInstanceEnabled()) { - throw new HelixException( - "setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed"); - } + && operation != InstanceConstants.InstanceOperation.ENABLE) { + if (!getInstanceEnabled()) { + throw new HelixException( + "setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed"); + } } else { setInstanceEnabledHelper(operation == InstanceConstants.InstanceOperation.ENABLE); } @@ -381,7 +381,6 @@ public class InstanceConfig extends HelixProperty { /** * Check if this instance is enabled for a given partition - * * @param partition the partition name to check * @return true if the instance is enabled for the partition, false otherwise */ diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java new file mode 100644 index 000000000..2276ab1c3 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -0,0 +1,421 @@ +package org.apache.helix.integration.rebalancer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixRollbackException; +import org.apache.helix.NotificationContext; +import org.apache.helix.TestHelper; +import org.apache.helix.api.status.ClusterManagementMode; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBucketDataAccessor; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.Message; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestInstanceOperation extends ZkTestBase { + protected final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int PARTITIONS = 20; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + private int REPLICA = 3; + protected ClusterControllerManager _controller; + List<MockParticipantManager> _participants = new ArrayList<>(); + List<String> _participantNames = new ArrayList<>(); + private Set<String> _allDBs = new HashSet<>(); + private ZkHelixClusterVerifier _clusterVerifier; + private ConfigAccessor _configAccessor; + private long _stateModelDelay = 30L; + protected AssignmentMetadataStore _assignmentMetadataStore; + HelixDataAccessor _dataAccessor; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + addParticipant(participantName); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setDeactivatedNodeAwareness(true) + .setResources(_allDBs) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) + .build(); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + _configAccessor = new ConfigAccessor(_gZkClient); + _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.stateTransitionCancelEnabled(true); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + createTestDBs(200); + + setUpWagedBaseline(); + } + + @Test + public void testEvacuate() throws Exception { + // EV should contain all participants, check resources one by one + Map<String, ExternalView> assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + } + + // evacuated instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // New ev should contain all instances but the evacuated one + assignment = getEV(); + List<String> currentActiveInstances = + _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList()); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource)); + Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); + Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); + Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); + } + } + + @Test(dependsOnMethods = "testEvacuate") + public void testRevertEvacuation() throws Exception { + + // revert an evacuate instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + Map<String, ExternalView> assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testRevertEvacuation") + public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { + // add a resource where downward state transition is slow + createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA, + REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName()); + _allDBs.add("TEST_DB3_DELAYED_CRUSHED"); + // add a resource where downward state transition is slow + createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave", + PARTITIONS, REPLICA, REPLICA - 1); + _allDBs.add("TEST_DB4_DELAYED_WAGED"); + // wait for assignment to finish + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // set bootstrap ST delay to a large number + _stateModelDelay = -300000L; + // evacuate an instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + // Messages should be pending at all instances besides the evacuate one + for (String participant : _participantNames) { + if (participant.equals(instanceToEvacuate)) { + continue; + } + TestHelper.verify( + () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000); + } + + // sleep a bit so ST messages can start executing + Thread.sleep(Math.abs(_stateModelDelay / 100)); + // before we cancel, check current EV + Map<String, ExternalView> assignment = getEV(); + for (String resource : _allDBs) { + // check every replica has >= 3 partitions and a top state partition + validateAssignmentInEv(assignment.get(resource)); + } + + // cancel the evacuation by setting instance operation back to `ENABLE` + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE); + + assignment = getEV(); + for (String resource : _allDBs) { + // check every replica has >= 3 active replicas, even before cluster converge + validateAssignmentInEv(assignment.get(resource)); + } + + // check cluster converge. We have longer delay for ST then verifier timeout. It will only converge if we cancel ST. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + // check every replica has >= 3 active replicas again + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish") + public void testEvacuateAndCancelBeforeDropFinish() throws Exception { + + // set DROP ST delay to a large number + _stateModelDelay = 300000L; + + // evacuate an instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + + // message should be pending at the to evacuate participant + TestHelper.verify( + () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000); + + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE); + // check every replica has >= 3 active replicas, even before cluster converge + Map<String, ExternalView> assignment = getEV(); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource)); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + // check every replica has >= 3 active replicas + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish") + public void testMarkEvacuationAfterEMM() throws Exception { + _stateModelDelay = 1000L; + Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME)); + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, + null); + addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE)); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + Map<String, ExternalView> assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE))); + } + + // set evacuate operation + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + + // there should be no evacuation happening + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate)); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // exit MM + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, + null); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + assignment = getEV(); + List<String> currentActiveInstances = + _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList()); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource)); + Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); + Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); + Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); + } + + } + + private void addParticipant(String participantName) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName); + + // start dummy participants + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName); + StateMachineEngine stateMachine = participant.getStateMachineEngine(); + // Using a delayed state model + StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory(); + stateMachine.registerStateModelFactory("MasterSlave", delayFactory); + + participant.syncStart(); + _participants.add(participant); + _participantNames.add(participantName); + } + + private void createTestDBs(long delayTime) throws InterruptedException { + createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB1_CRUSHED", + BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 200, + CrushEdRebalanceStrategy.class.getName()); + _allDBs.add("TEST_DB1_CRUSHED"); + createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB2_WAGED", BuiltInStateModelDefinitions.LeaderStandby.name(), + PARTITIONS, REPLICA, REPLICA - 1); + _allDBs.add("TEST_DB2_WAGED"); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + private Map<String, ExternalView> getEV() { + Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); + for (String db : _allDBs) { + ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViews.put(db, ev); + } + return externalViews; + } + + private Set<String> getParticipantsInEv(ExternalView ev) { + Set<String> assignedParticipants = new HashSet<>(); + ev.getPartitionSet().forEach(partition -> assignedParticipants.addAll(ev.getStateMap(partition).keySet())); + return assignedParticipants; + } + + // verify that each partition has >=REPLICA (3 in this case) replicas + private void validateAssignmentInEv(ExternalView ev) { + Set<String> partitionSet = ev.getPartitionSet(); + for (String partition : partitionSet) { + AtomicInteger activeReplicaCount = new AtomicInteger(); + ev.getStateMap(partition) + .values() + .stream() + .filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY")) + .forEach(v -> activeReplicaCount.getAndIncrement()); + Assert.assertTrue(activeReplicaCount.get() >=REPLICA); + + } + } + + private void setUpWagedBaseline() { + _assignmentMetadataStore = new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) { + public Map<String, ResourceAssignment> getBaseline() { + // Ensure this metadata store always read from the ZK without using cache. + super.reset(); + return super.getBaseline(); + } + + public synchronized Map<String, ResourceAssignment> getBestPossibleAssignment() { + // Ensure this metadata store always read from the ZK without using cache. + super.reset(); + return super.getBestPossibleAssignment(); + } + }; + + // Set test instance capacity and partition weights + ClusterConfig clusterConfig = _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig()); + String testCapacityKey = "TestCapacityKey"; + clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey)); + clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100)); + clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1)); + _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), clusterConfig); + } + + // A state transition model where either downward ST are slow (_stateModelDelay >0) or upward ST are slow (_stateModelDelay <0) + public class StDelayMSStateModelFactory extends StateModelFactory<StDelayMSStateModel> { + + @Override + public StDelayMSStateModel createNewStateModel(String resourceName, String partitionKey) { + StDelayMSStateModel model = new StDelayMSStateModel(); + return model; + } + } + + @StateModelInfo(initialState = "OFFLINE", states = {"MASTER", "SLAVE", "ERROR"}) + public class StDelayMSStateModel extends StateModel { + + public StDelayMSStateModel() { + _cancelled = false; + } + + private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{ + while(sleepTime >0 && !isCancelled()) { + Thread.sleep(5000); + sleepTime =- 5000; + } + if (isCancelled()) { + _cancelled = false; + throw new HelixRollbackException("EX"); + } + } + + @Transition(to = "SLAVE", from = "OFFLINE") + public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay < 0) { + sleepWhileNotCanceled(Math.abs(_stateModelDelay)); + } + } + + @Transition(to = "MASTER", from = "SLAVE") + public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay < 0) { + sleepWhileNotCanceled(Math.abs(_stateModelDelay)); + } + } + + @Transition(to = "SLAVE", from = "MASTER") + public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay > 0) { + sleepWhileNotCanceled(_stateModelDelay); + } + } + + @Transition(to = "OFFLINE", from = "SLAVE") + public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay > 0) { + sleepWhileNotCanceled(_stateModelDelay); + } + } + + @Transition(to = "DROPPED", from = "OFFLINE") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay > 0) { + sleepWhileNotCanceled(_stateModelDelay); + } + } + } +}
