Repository: helix Updated Branches: refs/heads/master f4bb7d607 -> 24c52394d
[HELIX-741] make swap instance more robust and idempotent Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/24c52394 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/24c52394 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/24c52394 Branch: refs/heads/master Commit: 24c52394dfff91c045367260c969f76560ebeb62 Parents: f4bb7d6 Author: Harry Zhang <[email protected]> Authored: Tue Jul 17 18:21:48 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Tue Jul 17 18:21:48 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/tools/ClusterSetup.java | 109 +++++++++----- .../helix/integration/TestSwapInstance.java | 150 ++++++++++++------- 2 files changed, 170 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/24c52394/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index 030cd3d..94a5f70 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; - +import org.I0Itec.zkclient.DataUpdater; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -46,10 +46,10 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; -import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ConstraintItem; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; @@ -210,11 +210,11 @@ public class ClusterSetup { InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(instanceId); instanceId = instanceConfig.getInstanceName(); - // ensure node is stopped + // ensure node is not live LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceId)); if (liveInstance != null) { - throw new HelixException("Can't drop " + instanceId + ", please stop " + instanceId - + " before drop it"); + throw new HelixException(String + .format("Cannot drop instance %s as it is still live. Please stop it first", instanceId)); } InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceId)); @@ -235,18 +235,32 @@ public class ClusterSetup { _admin.dropInstance(clusterName, config); } - public void swapInstance(String clusterName, String oldInstanceName, String newInstanceName) { + /** + * For CUSTOMIZED and SEMI_AUTO resources, this tool is used to change instance mapping + * in the cluster. When a node is replaced in the cluster, we just change preference list + * and map field in IdealState lf all resource, to replace old instance with new instance + * + * This method will ignore all resource with FULL_AUTO. + * This method will ensure that old instance is disabled AND not alive, but it's OK that new + * instance is just created, not live / enabled yet + * + * @param clusterName cluster name + * @param oldInstanceName old instance to swap out + * @param newInstanceName new instance to add to + */ + public void swapInstance(String clusterName, final String oldInstanceName, final String newInstanceName) { + if (oldInstanceName.equals(newInstanceName)) { + _logger.info("Old instance has same name as new instance, no need to swap"); + return; + } + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); Builder keyBuilder = accessor.keyBuilder(); - InstanceConfig oldConfig = accessor.getProperty(keyBuilder.instanceConfig(oldInstanceName)); - if (oldConfig == null) { - String error = "Old instance " + oldInstanceName + " does not exist, cannot swap"; - _logger.warn(error); - throw new HelixException(error); - } - + // If new instance config is missing, new instance is not in good state and therefore + // should not perform swap. + // It is OK that we miss old instance config for idempotency of this method InstanceConfig newConfig = accessor.getProperty(keyBuilder.instanceConfig(newInstanceName)); if (newConfig == null) { String error = "New instance " + newInstanceName + " does not exist, cannot swap"; @@ -254,36 +268,57 @@ public class ClusterSetup { throw new HelixException(error); } - ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); - // ensure old instance is disabled, otherwise fail - if (oldConfig.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null - || !clusterConfig.getDisabledInstances().containsKey(oldInstanceName))) { - String error = - "Old instance " + oldInstanceName + " is enabled, it need to be disabled and turned off"; - _logger.warn(error); - throw new HelixException(error); - } - // ensure old instance is down, otherwise fail - List<String> liveInstanceNames = accessor.getChildNames(accessor.keyBuilder().liveInstances()); - - if (liveInstanceNames.contains(oldInstanceName)) { - String error = - "Old instance " + oldInstanceName + " is still on, it need to be disabled and turned off"; - _logger.warn(error); - throw new HelixException(error); + try { + // drop instance will ensure the old instance is disabled, and not live, or it will + // throw exception + dropInstanceFromCluster(clusterName, oldInstanceName); + } catch (HelixException e) { + // If old instance is already gone, continue to swap. Note that it is possible + // that do to some error, we still keep a disabled record of old instance in + // cluster config, we don't strictly check and fix that + if (e.toString().contains("does not exist")) { + _logger.warn("Instance {} does not exist, continue to swap instance for cluster {}", + oldInstanceName, clusterName); + } else { + _logger.warn("Failed to drop instance {} from cluster {}", oldInstanceName, clusterName, e); + throw e; + } } - dropInstanceFromCluster(clusterName, oldInstanceName); + // When the amount of ideal state data is huge, we might only read partially from ZK + // so the safest way is to list first and read each individual ideal state + List<String> existingIdealStateNames = + accessor.getChildNames(accessor.keyBuilder().idealStates()); - List<IdealState> existingIdealStates = - accessor.getChildValues(accessor.keyBuilder().idealStates()); - for (IdealState idealState : existingIdealStates) { - swapInstanceInIdealState(idealState, oldInstanceName, newInstanceName); - accessor.setProperty(accessor.keyBuilder().idealStates(idealState.getResourceName()), - idealState); + for (String resourceName : existingIdealStateNames) { + IdealState resourceIdealState = + accessor.getProperty(accessor.keyBuilder().idealStates(resourceName)); + if (resourceIdealState.getRebalanceMode().equals(RebalanceMode.FULL_AUTO)) { + _logger.warn("Resource {} is in FULL_AUTO rebalance mode, don't swap", resourceName); + continue; + } + // For CUSTOMIZED and SEMI_AUTO rebalance mode, swap instance + swapInstanceInIdealState(resourceIdealState, oldInstanceName, newInstanceName); + + // Update ideal state + accessor.updateProperty(accessor.keyBuilder().idealStates(resourceName), + new DataUpdater<ZNRecord>() { + @Override public ZNRecord update(ZNRecord znRecord) { + // Need to swap again in case there are added partition with old instance + swapInstanceInIdealState(new IdealState(znRecord), oldInstanceName, newInstanceName); + return znRecord; + } + }, resourceIdealState); + _logger.info("Successfully swapped instance for resource {}", resourceName); } } + /** + * Replace old instance name in map field and list field with new instance name + * @param idealState ideal state object + * @param oldInstance old instance name + * @param newInstance new instance name + */ void swapInstanceInIdealState(IdealState idealState, String oldInstance, String newInstance) { for (String partition : idealState.getRecord().getMapFields().keySet()) { Map<String, String> valMap = idealState.getRecord().getMapField(partition); http://git-wip-us.apache.org/repos/asf/helix/blob/24c52394/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java index 8db7274..207fcf1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java @@ -21,99 +21,145 @@ package org.apache.helix.integration; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.integration.common.ZkStandAloneCMTestBase; -import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.IdealState; -import org.apache.helix.tools.ClusterStateVerifier; import org.testng.Assert; import org.testng.annotations.Test; public class TestSwapInstance extends ZkStandAloneCMTestBase { @Test - public void TestSwap() throws Exception { + public void testSwapInstance() throws Exception { HelixManager manager = _controller; - HelixDataAccessor helixAccessor = manager.getHelixDataAccessor(); - _gSetupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL); - _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica); + HelixDataAccessor dataAccessor = manager.getHelixDataAccessor(); - ZNRecord idealStateOld1 = new ZNRecord("TestDB"); - ZNRecord idealStateOld2 = new ZNRecord("MyDB"); + // Create semi auto resource + _gSetupTool.addResourceToCluster(CLUSTER_NAME, "db-semi", 64, STATE_MODEL, + IdealState.RebalanceMode.SEMI_AUTO.name()); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "db-semi", _replica); - IdealState is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB")); - idealStateOld1.merge(is1.getRecord()); + // Create customized resource + _gSetupTool.addResourceToCluster(CLUSTER_NAME, "db-customized", 64, STATE_MODEL, + IdealState.RebalanceMode.CUSTOMIZED.name()); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "db-customized", _replica); - IdealState is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB")); - idealStateOld2.merge(is2.getRecord()); + // Create full-auto resource + _gSetupTool.addResourceToCluster(CLUSTER_NAME, "db-fa", 64, STATE_MODEL, + IdealState.RebalanceMode.FULL_AUTO.name(), RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "db-fa", _replica); - Assert.assertTrue(ClusterStateVerifier.verifyByPolling( - new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME))); + // Wait for cluster converge + Assert.assertTrue(_clusterVerifier.verifyByPolling()); - String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0); - _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false); + // Get ideal states before swap + IdealState semiIS = dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates("db-semi")); + IdealState customizedIS = + dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates("db-customized")); + IdealState faIs = + dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates("db-fa")); - Assert.assertTrue(_clusterVerifier.verifyByPolling()); + String oldInstanceName = String.format("%s_%s", PARTICIPANT_PREFIX, START_PORT); + String newInstanceName = String.format("%s_%s", PARTICIPANT_PREFIX, 66666); + + try { + _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName); + Assert.fail("Cannot swap as new instance is not added to cluster yet"); + } catch (Exception e) { + // OK - new instance not added to cluster yet + } - String instanceName2 = PARTICIPANT_PREFIX + "_" + (START_PORT + 444); - _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName2); + // Add new instance to cluster + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newInstanceName); - boolean exception = false; try { - _gSetupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2); + _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName); + Assert.fail("Cannot swap as old instance is still alive"); } catch (Exception e) { - exception = true; + // OK - old instance still alive } - Assert.assertTrue(exception); + // Stop old instance _participants[0].syncStop(); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - exception = false; try { - _gSetupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2); + _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName); + Assert.fail("Cannot swap as old instance is still enabled"); } catch (Exception e) { - e.printStackTrace(); - exception = true; + // OK - old instance still alive } - Assert.assertFalse(exception); - MockParticipantManager newParticipant = - new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName2); - newParticipant.syncStart(); + // disable old instance + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, oldInstanceName, false); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - is1 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB")); + // We can swap now + _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName); + + // verify cluster + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + verifySwapInstance(dataAccessor, "db-semi", semiIS, oldInstanceName, newInstanceName, false); + verifySwapInstance(dataAccessor, "db-customized", customizedIS, oldInstanceName, newInstanceName, + false); + verifySwapInstance(dataAccessor, "db-fa", faIs, oldInstanceName, newInstanceName, true); + + // Verify idempotency + _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName); + verifySwapInstance(dataAccessor, "db-semi", semiIS, oldInstanceName, newInstanceName, false); + verifySwapInstance(dataAccessor, "db-customized", customizedIS, oldInstanceName, newInstanceName, + false); + verifySwapInstance(dataAccessor, "db-fa", faIs, oldInstanceName, newInstanceName, true); - is2 = helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB")); + } + + private void verifySwapInstance(HelixDataAccessor dataAccessor, String resourceName, + IdealState oldIs, String oldInstance, String newInstance, boolean isFullAuto) { + IdealState newIs = dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates(resourceName)); + if (isFullAuto) { + // Full auto resource should not contain new instance as it's not live yet + for (String key : newIs.getRecord().getMapFields().keySet()) { + Assert.assertFalse(newIs.getRecord().getMapField(key).keySet().contains(newInstance)); + } + + for (String key : newIs.getRecord().getListFields().keySet()) { + Assert.assertFalse(newIs.getRecord().getListField(key).contains(newInstance)); + } + } else { + verifyIdealStateWithSwappedInstance(oldIs, newIs, oldInstance, newInstance); + } + } + + private void verifyIdealStateWithSwappedInstance(IdealState oldIs, IdealState newIs, + String oldInstance, String newInstance) { - for (String key : idealStateOld1.getMapFields().keySet()) { - for (String host : idealStateOld1.getMapField(key).keySet()) { - if (host.equals(instanceName)) { - Assert.assertTrue(idealStateOld1.getMapField(key).get(instanceName) - .equals(is1.getRecord().getMapField(key).get(instanceName2))); + // Verify map fields + for (String key : oldIs.getRecord().getMapFields().keySet()) { + for (String host : oldIs.getRecord().getMapField(key).keySet()) { + if (host.equals(oldInstance)) { + Assert.assertTrue(oldIs.getRecord().getMapField(key).get(oldInstance) + .equals(newIs.getRecord().getMapField(key).get(newInstance))); } else { - Assert.assertTrue(idealStateOld1.getMapField(key).get(host) - .equals(is1.getRecord().getMapField(key).get(host))); + Assert.assertTrue(oldIs.getRecord().getMapField(key).get(host) + .equals(newIs.getRecord().getMapField(key).get(host))); } } } - for (String key : idealStateOld1.getListFields().keySet()) { - Assert.assertEquals(idealStateOld1.getListField(key).size(), is1.getRecord() + // verify list fields + for (String key : oldIs.getRecord().getListFields().keySet()) { + Assert.assertEquals(oldIs.getRecord().getListField(key).size(), newIs.getRecord() .getListField(key).size()); - for (int i = 0; i < idealStateOld1.getListField(key).size(); i++) { - String host = idealStateOld1.getListField(key).get(i); - String newHost = is1.getRecord().getListField(key).get(i); - if (host.equals(instanceName)) { - Assert.assertTrue(newHost.equals(instanceName2)); + for (int i = 0; i < oldIs.getRecord().getListField(key).size(); i++) { + String host = oldIs.getRecord().getListField(key).get(i); + String newHost = newIs.getRecord().getListField(key).get(i); + if (host.equals(oldInstance)) { + Assert.assertTrue(newHost.equals(newInstance)); } else { - // System.out.println(key + " " + i+ " " + host + " "+newHost); - // System.out.println(idealStateOld1.getListField(key)); - // System.out.println(is1.getRecord().getListField(key)); - Assert.assertTrue(host.equals(newHost)); } } } } + }
