Repository: helix Updated Branches: refs/heads/helix-0.6.x 3a4ff21b4 -> 86b2b25ac
[HELIX-575] Should not send FINALIZED callback when a bucketized resource is removed, rb=32032 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/86b2b25a Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/86b2b25a Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/86b2b25a Branch: refs/heads/helix-0.6.x Commit: 86b2b25acbe26286a52881c11539d343130bc4fa Parents: 3a4ff21 Author: zzhang <[email protected]> Authored: Fri Mar 13 00:48:11 2015 -0700 Committer: zzhang <[email protected]> Committed: Fri Mar 13 00:48:11 2015 -0700 ---------------------------------------------------------------------- .../helix/manager/zk/CallbackHandler.java | 13 +- .../integration/TestBucketizedResource.java | 193 +++++++++++++------ .../integration/ZkIntegrationTestBase.java | 3 + .../manager/ClusterControllerManager.java | 4 + 4 files changed, 149 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/86b2b25a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 65fe2f9..fd59ecc 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -378,17 +378,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener if (parentPath != null && parentPath.startsWith(_path)) { NotificationContext changeContext = new NotificationContext(_manager); - if (currentChilds == null) { - // parentPath has been removed - if (parentPath.equals(_path)) { - // _path has been removed, remove this listener - _manager.removeListener(_propertyKey, _listener); - } - changeContext.setType(NotificationContext.Type.FINALIZE); + if (currentChilds == null && parentPath.equals(_path)) { + // _path has been removed, remove this listener + // removeListener will call handler.reset(), which in turn call invoke() on FINALIZE type + _manager.removeListener(_propertyKey, _listener); } else { changeContext.setType(NotificationContext.Type.CALLBACK); + invoke(changeContext); } - invoke(changeContext); } } catch (Exception e) { String msg = http://git-wip-us.apache.org/repos/asf/helix/blob/86b2b25a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java index 9b72e41..a29ca0d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java @@ -23,16 +23,17 @@ import java.util.Arrays; import java.util.Date; import java.util.List; +import org.apache.helix.ExternalViewChangeListener; import org.apache.helix.HelixAdmin; +import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; -import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.NotificationContext.Type; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.tools.ClusterStateVerifier; @@ -43,48 +44,58 @@ import org.testng.Assert; import org.testng.annotations.Test; public class TestBucketizedResource extends ZkIntegrationTestBase { + + private void setupCluster(String clusterName, List<String> instanceNames, String dbName, + int replica, int partitions, int bucketSize) { + _gSetupTool.addCluster(clusterName, true); + _gSetupTool.addInstancesToCluster(clusterName, + instanceNames.toArray(new String[instanceNames.size()])); + + // add a bucketized resource + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + ZNRecord idealStateRec = + DefaultIdealStateCalculator.calculateIdealState(instanceNames, partitions, replica - 1, + dbName, + "MASTER", "SLAVE"); + IdealState idealState = new IdealState(idealStateRec); + idealState.setBucketSize(bucketSize); + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + idealState.setReplicas(Integer.toString(replica)); + accessor.setProperty(keyBuilder.idealStates(dbName), idealState); + + } + @Test() - public void testBucketizedResource() throws Exception { + public void testBucketizedResource() { // Logger.getRootLogger().setLevel(Level.INFO); String className = TestHelper.getTestClassName(); String methodName = TestHelper.getTestMethodName(); String clusterName = className + "_" + methodName; + List<String> instanceNames = + Arrays.asList("localhost_12918", "localhost_12919", "localhost_12920", "localhost_12921", "localhost_12922"); + int n = instanceNames.size(); + String dbName = "TestDB0"; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); MockParticipantManager[] participants = new MockParticipantManager[5]; - // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); - - TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port - "localhost", // participant name prefix - "TestDB", // resource name prefix - 1, // resources - 10, // partitions per resource - 5, // number of nodes - 3, // replicas - "MasterSlave", true); // do rebalance - - ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); - ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor); - // String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, - // "TestDB0"); - Builder keyBuilder = accessor.keyBuilder(); - IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0")); - idealState.setBucketSize(1); - accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState); - - ClusterControllerManager controller = - new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + + setupCluster(clusterName, instanceNames, dbName, 3, 10, 1); + + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + + ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName); controller.syncStart(); // start participants - for (int i = 0; i < 5; i++) { - String instanceName = "localhost_" + (12918 + i); - - participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + for (int i = 0; i < n; i++) { + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(i)); participants[i].syncStart(); } - PropertyKey evKey = accessor.keyBuilder().externalView("TestDB0"); + PropertyKey evKey = accessor.keyBuilder().externalView(dbName); boolean result = ClusterStateVerifier @@ -114,7 +125,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase { // clean up controller.syncStop(); - for (int i = 0; i < 5; i++) { + for (int i = 0; i < n; i++) { participants[i].syncStop(); } @@ -122,44 +133,28 @@ public class TestBucketizedResource extends ZkIntegrationTestBase { } @Test - public void testBounceDisableAndDrop() throws Exception { + public void testBounceDisableAndDrop() { String className = TestHelper.getTestClassName(); String methodName = TestHelper.getTestMethodName(); String clusterName = className + "_" + methodName; String dbName = "TestDB0"; - int n = 5; - int r = 3; List<String> instanceNames = Arrays.asList("localhost_0", "localhost_1", "localhost_2", "localhost_3", "localhost_4"); + int n = instanceNames.size(); System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - // create cluster and add nodes to cluster - MockParticipantManager[] participants = new MockParticipantManager[n]; - _gSetupTool.addCluster(clusterName, true); - _gSetupTool.addInstancesToCluster(clusterName, - instanceNames.toArray(new String[instanceNames.size()])); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - // add a bucketized resource - ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); - ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor); - Builder keyBuilder = accessor.keyBuilder(); - ZNRecord idealStateRec = - DefaultIdealStateCalculator.calculateIdealState(instanceNames, 10, r - 1, dbName, "MASTER", - "SLAVE"); - IdealState idealState = new IdealState(idealStateRec); - idealState.setBucketSize(2); - idealState.setStateModelDefRef("MasterSlave"); - idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - idealState.setReplicas(Integer.toString(r)); - accessor.setProperty(keyBuilder.idealStates(dbName), idealState); + setupCluster(clusterName, instanceNames, dbName, 3, 10, 2); // start controller - ClusterControllerManager controller = - new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName); controller.syncStart(); // start participants + MockParticipantManager[] participants = new MockParticipantManager[n]; for (int i = 0; i < n; i++) { participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(i)); participants[i].syncStart(); @@ -184,7 +179,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase { String path = keyBuilder.currentState(instanceNames.get(0), participants[0].getSessionId(), dbName) .getPath(); - ZNRecord record = baseAccessor.get(path, null, 0); + ZNRecord record = _baseAccessor.get(path, null, 0); Assert.assertTrue(record.getMapFields().size() == 0); // disable the bucketize resource @@ -204,7 +199,7 @@ public class TestBucketizedResource extends ZkIntegrationTestBase { // make sure external-view is cleaned up path = keyBuilder.externalView(dbName).getPath(); - result = baseAccessor.exists(path, 0); + result = _baseAccessor.exists(path, 0); Assert.assertFalse(result); // clean up @@ -213,6 +208,92 @@ public class TestBucketizedResource extends ZkIntegrationTestBase { participant.syncStop(); } System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + class TestExternalViewListener implements ExternalViewChangeListener { + int cbCnt = 0; + + @Override + public void onExternalViewChange(List<ExternalView> externalViewList, + NotificationContext changeContext) { + if (changeContext.getType() == Type.CALLBACK) { + cbCnt++; + } + } + + } + + @Test + public void testListenerOnBucketizedResource() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + String dbName = "TestDB0"; + List<String> instanceNames = + Arrays.asList("localhost_0", "localhost_1", "localhost_2", "localhost_3", "localhost_4"); + int n = instanceNames.size(); + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + setupCluster(clusterName, instanceNames, dbName, 3, 10, 2); + + // start controller + ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName); + controller.syncStart(); + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) { + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(i)); + participants[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // add an external view listener + TestExternalViewListener listener = new TestExternalViewListener(); + controller.addExternalViewChangeListener(listener); + + // remove "TestDB0" + _gSetupTool.dropResourceFromCluster(clusterName, dbName); + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + // wait callback to finish + Thread.sleep(100); + listener.cbCnt = 0; + + // add a new db + String newDbName = "TestDB1"; + int r = 3; + ZNRecord idealStateRec = + DefaultIdealStateCalculator.calculateIdealState(instanceNames, 10, r - 1, newDbName, + "MASTER", "SLAVE"); + IdealState idealState = new IdealState(idealStateRec); + idealState.setBucketSize(2); + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + idealState.setReplicas(Integer.toString(r)); + accessor.setProperty(keyBuilder.idealStates(newDbName), idealState); + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + Assert.assertTrue(listener.cbCnt > 0); + + // clean up + controller.syncStop(); + for (MockParticipantManager participant : participants) { + participant.syncStop(); + } + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } } http://git-wip-us.apache.org/repos/asf/helix/blob/86b2b25a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java index 8cb697b..22696c3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/ZkIntegrationTestBase.java @@ -25,6 +25,7 @@ import org.I0Itec.zkclient.ZkServer; import org.apache.helix.ConfigAccessor; import org.apache.helix.model.ConfigScope; import org.apache.helix.model.builder.ConfigScopeBuilder; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; @@ -46,6 +47,7 @@ public class ZkIntegrationTestBase { protected static ZkServer _zkServer; protected static ZkClient _gZkClient; protected static ClusterSetup _gSetupTool; + protected static BaseDataAccessor<ZNRecord> _baseAccessor; public static final String ZK_ADDR = "localhost:2183"; protected static final String CLUSTER_PREFIX = "CLUSTER"; @@ -67,6 +69,7 @@ public class ZkIntegrationTestBase { _gZkClient = new ZkClient(ZK_ADDR); _gZkClient.setZkSerializer(new ZNRecordSerializer()); _gSetupTool = new ClusterSetup(ZK_ADDR); + _baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); } @AfterSuite http://git-wip-us.apache.org/repos/asf/helix/blob/86b2b25a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java index b8f0f2b..9e10771 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java @@ -36,6 +36,10 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable private final CountDownLatch _stopCountDown = new CountDownLatch(1); private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1); + public ClusterControllerManager(String zkAddr, String clusterName) { + this(zkAddr, clusterName, "controller"); + } + public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) { super(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr); }
