Repository: helix
Updated Branches:
  refs/heads/master f4bb7d607 -> 24c52394d


[HELIX-741] make swap instance more robust and idempotent


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/24c52394
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/24c52394
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/24c52394

Branch: refs/heads/master
Commit: 24c52394dfff91c045367260c969f76560ebeb62
Parents: f4bb7d6
Author: Harry Zhang <[email protected]>
Authored: Tue Jul 17 18:21:48 2018 -0700
Committer: Harry Zhang <[email protected]>
Committed: Tue Jul 17 18:21:48 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/tools/ClusterSetup.java    | 109 +++++++++-----
 .../helix/integration/TestSwapInstance.java     | 150 ++++++++++++-------
 2 files changed, 170 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/24c52394/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 030cd3d..94a5f70 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-
+import org.I0Itec.zkclient.DataUpdater;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -46,10 +46,10 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ConstraintItem;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
@@ -210,11 +210,11 @@ public class ClusterSetup {
     InstanceConfig instanceConfig = 
InstanceConfig.toInstanceConfig(instanceId);
     instanceId = instanceConfig.getInstanceName();
 
-    // ensure node is stopped
+    // ensure node is not live
     LiveInstance liveInstance = 
accessor.getProperty(keyBuilder.liveInstance(instanceId));
     if (liveInstance != null) {
-      throw new HelixException("Can't drop " + instanceId + ", please stop " + 
instanceId
-          + " before drop it");
+      throw new HelixException(String
+          .format("Cannot drop instance %s as it is still live. Please stop it 
first", instanceId));
     }
 
     InstanceConfig config = 
accessor.getProperty(keyBuilder.instanceConfig(instanceId));
@@ -235,18 +235,32 @@ public class ClusterSetup {
     _admin.dropInstance(clusterName, config);
   }
 
-  public void swapInstance(String clusterName, String oldInstanceName, String 
newInstanceName) {
+  /**
+   * For CUSTOMIZED and SEMI_AUTO resources, this tool is used to change 
instance mapping
+   * in the cluster. When a node is replaced in the cluster, we just change 
preference list
+   * and map field in IdealState lf all resource, to replace old instance with 
new instance
+   *
+   * This method will ignore all resource with FULL_AUTO.
+   * This method will ensure that old instance is disabled AND not alive, but 
it's OK that new
+   * instance is just created, not live / enabled yet
+   *
+   * @param clusterName cluster name
+   * @param oldInstanceName old instance to swap out
+   * @param newInstanceName new instance to add to
+   */
+  public void swapInstance(String clusterName, final String oldInstanceName, 
final String newInstanceName) {
+    if (oldInstanceName.equals(newInstanceName)) {
+      _logger.info("Old instance has same name as new instance, no need to 
swap");
+      return;
+    }
+
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
-    InstanceConfig oldConfig = 
accessor.getProperty(keyBuilder.instanceConfig(oldInstanceName));
-    if (oldConfig == null) {
-      String error = "Old instance " + oldInstanceName + " does not exist, 
cannot swap";
-      _logger.warn(error);
-      throw new HelixException(error);
-    }
-
+    // If new instance config is missing, new instance is not in good state 
and therefore
+    // should not perform swap.
+    // It is OK that we miss old instance config for idempotency of this method
     InstanceConfig newConfig = 
accessor.getProperty(keyBuilder.instanceConfig(newInstanceName));
     if (newConfig == null) {
       String error = "New instance " + newInstanceName + " does not exist, 
cannot swap";
@@ -254,36 +268,57 @@ public class ClusterSetup {
       throw new HelixException(error);
     }
 
-    ClusterConfig clusterConfig = 
accessor.getProperty(keyBuilder.clusterConfig());
-    // ensure old instance is disabled, otherwise fail
-    if (oldConfig.getInstanceEnabled() && 
(clusterConfig.getDisabledInstances() == null
-        || 
!clusterConfig.getDisabledInstances().containsKey(oldInstanceName))) {
-      String error =
-          "Old instance " + oldInstanceName + " is enabled, it need to be 
disabled and turned off";
-      _logger.warn(error);
-      throw new HelixException(error);
-    }
-    // ensure old instance is down, otherwise fail
-    List<String> liveInstanceNames = 
accessor.getChildNames(accessor.keyBuilder().liveInstances());
-
-    if (liveInstanceNames.contains(oldInstanceName)) {
-      String error =
-          "Old instance " + oldInstanceName + " is still on, it need to be 
disabled and turned off";
-      _logger.warn(error);
-      throw new HelixException(error);
+    try {
+      // drop instance will ensure the old instance is disabled, and not live, 
or it will
+      // throw exception
+      dropInstanceFromCluster(clusterName, oldInstanceName);
+    } catch (HelixException e) {
+      // If old instance is already gone, continue to swap. Note that it is 
possible
+      // that do to some error, we still keep a disabled record of old 
instance in
+      // cluster config, we don't strictly check and fix that
+      if (e.toString().contains("does not exist")) {
+        _logger.warn("Instance {} does not exist, continue to swap instance 
for cluster {}",
+            oldInstanceName, clusterName);
+      } else {
+        _logger.warn("Failed to drop instance {} from cluster {}", 
oldInstanceName, clusterName, e);
+        throw e;
+      }
     }
 
-    dropInstanceFromCluster(clusterName, oldInstanceName);
+    // When the amount of ideal state data is huge, we might only read 
partially from ZK
+    // so the safest way is to list first and read each individual ideal state
+    List<String> existingIdealStateNames =
+        accessor.getChildNames(accessor.keyBuilder().idealStates());
 
-    List<IdealState> existingIdealStates =
-        accessor.getChildValues(accessor.keyBuilder().idealStates());
-    for (IdealState idealState : existingIdealStates) {
-      swapInstanceInIdealState(idealState, oldInstanceName, newInstanceName);
-      
accessor.setProperty(accessor.keyBuilder().idealStates(idealState.getResourceName()),
-          idealState);
+    for (String resourceName : existingIdealStateNames) {
+      IdealState resourceIdealState =
+          
accessor.getProperty(accessor.keyBuilder().idealStates(resourceName));
+      if 
(resourceIdealState.getRebalanceMode().equals(RebalanceMode.FULL_AUTO)) {
+        _logger.warn("Resource {} is in FULL_AUTO rebalance mode, don't swap", 
resourceName);
+        continue;
+      }
+      // For CUSTOMIZED and SEMI_AUTO rebalance mode, swap instance
+      swapInstanceInIdealState(resourceIdealState, oldInstanceName, 
newInstanceName);
+
+      // Update ideal state
+      accessor.updateProperty(accessor.keyBuilder().idealStates(resourceName),
+          new DataUpdater<ZNRecord>() {
+            @Override public ZNRecord update(ZNRecord znRecord) {
+              // Need to swap again in case there are added partition with old 
instance
+              swapInstanceInIdealState(new IdealState(znRecord), 
oldInstanceName, newInstanceName);
+              return znRecord;
+            }
+          }, resourceIdealState);
+      _logger.info("Successfully swapped instance for resource {}", 
resourceName);
     }
   }
 
+  /**
+   * Replace old instance name in map field and list field with new instance 
name
+   * @param idealState ideal state object
+   * @param oldInstance old instance name
+   * @param newInstance new instance name
+   */
   void swapInstanceInIdealState(IdealState idealState, String oldInstance, 
String newInstance) {
     for (String partition : idealState.getRecord().getMapFields().keySet()) {
       Map<String, String> valMap = 
idealState.getRecord().getMapField(partition);

http://git-wip-us.apache.org/repos/asf/helix/blob/24c52394/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java 
b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
index 8db7274..207fcf1 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestSwapInstance.java
@@ -21,99 +21,145 @@ package org.apache.helix.integration;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
-import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.tools.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestSwapInstance extends ZkStandAloneCMTestBase {
   @Test
-  public void TestSwap() throws Exception {
+  public void testSwapInstance() throws Exception {
     HelixManager manager = _controller;
-    HelixDataAccessor helixAccessor = manager.getHelixDataAccessor();
-    _gSetupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", 64, STATE_MODEL);
-    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", _replica);
+    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
 
-    ZNRecord idealStateOld1 = new ZNRecord("TestDB");
-    ZNRecord idealStateOld2 = new ZNRecord("MyDB");
+    // Create semi auto resource
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, "db-semi", 64, STATE_MODEL,
+        IdealState.RebalanceMode.SEMI_AUTO.name());
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "db-semi", _replica);
 
-    IdealState is1 = 
helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
-    idealStateOld1.merge(is1.getRecord());
+    // Create customized resource
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, "db-customized", 64, 
STATE_MODEL,
+        IdealState.RebalanceMode.CUSTOMIZED.name());
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "db-customized", 
_replica);
 
-    IdealState is2 = 
helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
-    idealStateOld2.merge(is2.getRecord());
+    // Create full-auto resource
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, "db-fa", 64, STATE_MODEL,
+        IdealState.RebalanceMode.FULL_AUTO.name(), 
RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "db-fa", _replica);
 
-    Assert.assertTrue(ClusterStateVerifier.verifyByPolling(
-        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, 
CLUSTER_NAME)));
+    // Wait for cluster converge
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
instanceName, false);
+    // Get ideal states before swap
+    IdealState semiIS = 
dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates("db-semi"));
+    IdealState customizedIS =
+        
dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates("db-customized"));
+    IdealState faIs =
+        
dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates("db-fa"));
 
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    String oldInstanceName = String.format("%s_%s", PARTICIPANT_PREFIX, 
START_PORT);
+    String newInstanceName = String.format("%s_%s", PARTICIPANT_PREFIX, 66666);
+
+    try {
+      _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName);
+      Assert.fail("Cannot swap as new instance is not added to cluster yet");
+    } catch (Exception e) {
+      // OK - new instance not added to cluster yet
+    }
 
-    String instanceName2 = PARTICIPANT_PREFIX + "_" + (START_PORT + 444);
-    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName2);
+    // Add new instance to cluster
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newInstanceName);
 
-    boolean exception = false;
     try {
-      _gSetupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
+      _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName);
+      Assert.fail("Cannot swap as old instance is still alive");
     } catch (Exception e) {
-      exception = true;
+      // OK - old instance still alive
     }
-    Assert.assertTrue(exception);
 
+    // Stop old instance
     _participants[0].syncStop();
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    exception = false;
     try {
-      _gSetupTool.swapInstance(CLUSTER_NAME, instanceName, instanceName2);
+      _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName);
+      Assert.fail("Cannot swap as old instance is still enabled");
     } catch (Exception e) {
-      e.printStackTrace();
-      exception = true;
+      // OK - old instance still alive
     }
-    Assert.assertFalse(exception);
-    MockParticipantManager newParticipant =
-        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName2);
-    newParticipant.syncStart();
 
+    // disable old instance
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
oldInstanceName, false);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    is1 = 
helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("TestDB"));
+    // We can swap now
+    _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName);
+
+    // verify cluster
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    verifySwapInstance(dataAccessor, "db-semi", semiIS, oldInstanceName, 
newInstanceName, false);
+    verifySwapInstance(dataAccessor, "db-customized", customizedIS, 
oldInstanceName, newInstanceName,
+        false);
+    verifySwapInstance(dataAccessor, "db-fa", faIs, oldInstanceName, 
newInstanceName, true);
+
+    // Verify idempotency
+    _gSetupTool.swapInstance(CLUSTER_NAME, oldInstanceName, newInstanceName);
+    verifySwapInstance(dataAccessor, "db-semi", semiIS, oldInstanceName, 
newInstanceName, false);
+    verifySwapInstance(dataAccessor, "db-customized", customizedIS, 
oldInstanceName, newInstanceName,
+        false);
+    verifySwapInstance(dataAccessor, "db-fa", faIs, oldInstanceName, 
newInstanceName, true);
 
-    is2 = 
helixAccessor.getProperty(helixAccessor.keyBuilder().idealStates("MyDB"));
+  }
+
+  private void verifySwapInstance(HelixDataAccessor dataAccessor, String 
resourceName,
+      IdealState oldIs, String oldInstance, String newInstance, boolean 
isFullAuto) {
+    IdealState newIs = 
dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates(resourceName));
+    if (isFullAuto) {
+      // Full auto resource should not contain new instance as it's not live 
yet
+      for (String key : newIs.getRecord().getMapFields().keySet()) {
+        
Assert.assertFalse(newIs.getRecord().getMapField(key).keySet().contains(newInstance));
+      }
+
+      for (String key : newIs.getRecord().getListFields().keySet()) {
+        
Assert.assertFalse(newIs.getRecord().getListField(key).contains(newInstance));
+      }
+    } else {
+      verifyIdealStateWithSwappedInstance(oldIs, newIs, oldInstance, 
newInstance);
+    }
+  }
+
+  private void verifyIdealStateWithSwappedInstance(IdealState oldIs, 
IdealState newIs,
+      String oldInstance, String newInstance) {
 
-    for (String key : idealStateOld1.getMapFields().keySet()) {
-      for (String host : idealStateOld1.getMapField(key).keySet()) {
-        if (host.equals(instanceName)) {
-          Assert.assertTrue(idealStateOld1.getMapField(key).get(instanceName)
-              .equals(is1.getRecord().getMapField(key).get(instanceName2)));
+    // Verify map fields
+    for (String key : oldIs.getRecord().getMapFields().keySet()) {
+      for (String host : oldIs.getRecord().getMapField(key).keySet()) {
+        if (host.equals(oldInstance)) {
+          Assert.assertTrue(oldIs.getRecord().getMapField(key).get(oldInstance)
+              .equals(newIs.getRecord().getMapField(key).get(newInstance)));
         } else {
-          Assert.assertTrue(idealStateOld1.getMapField(key).get(host)
-              .equals(is1.getRecord().getMapField(key).get(host)));
+          Assert.assertTrue(oldIs.getRecord().getMapField(key).get(host)
+              .equals(newIs.getRecord().getMapField(key).get(host)));
         }
       }
     }
 
-    for (String key : idealStateOld1.getListFields().keySet()) {
-      Assert.assertEquals(idealStateOld1.getListField(key).size(), 
is1.getRecord()
+    // verify list fields
+    for (String key : oldIs.getRecord().getListFields().keySet()) {
+      Assert.assertEquals(oldIs.getRecord().getListField(key).size(), 
newIs.getRecord()
           .getListField(key).size());
-      for (int i = 0; i < idealStateOld1.getListField(key).size(); i++) {
-        String host = idealStateOld1.getListField(key).get(i);
-        String newHost = is1.getRecord().getListField(key).get(i);
-        if (host.equals(instanceName)) {
-          Assert.assertTrue(newHost.equals(instanceName2));
+      for (int i = 0; i < oldIs.getRecord().getListField(key).size(); i++) {
+        String host = oldIs.getRecord().getListField(key).get(i);
+        String newHost = newIs.getRecord().getListField(key).get(i);
+        if (host.equals(oldInstance)) {
+          Assert.assertTrue(newHost.equals(newInstance));
         } else {
-          // System.out.println(key + " " + i+ " " + host + " "+newHost);
-          // System.out.println(idealStateOld1.getListField(key));
-          // System.out.println(is1.getRecord().getListField(key));
-
           Assert.assertTrue(host.equals(newHost));
         }
       }
     }
   }
+
 }

Reply via email to