Properly remove clusters after each test, and clean up duplicated codes in tests and move them into base test classes.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c0d5792b Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c0d5792b Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c0d5792b Branch: refs/heads/master Commit: c0d5792b745c67b6fee56ba79df02be89d1f049e Parents: fe97055 Author: Lei Xia <[email protected]> Authored: Thu Jun 7 17:15:54 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Fri Jul 13 11:20:49 2018 -0700 ---------------------------------------------------------------------- .../BestPossibleExternalViewVerifier.java | 13 +- .../java/org/apache/helix/ZkUnitTestBase.java | 442 +----------- .../org/apache/helix/common/ZkTestBase.java | 691 +++++++++++++++++++ .../stages/TestMessageThrottleStage.java | 4 +- .../stages/TestRebalancePipeline.java | 8 +- .../SinglePartitionLeaderStandByTest.java | 5 +- .../helix/integration/TestAddClusterV2.java | 23 +- .../TestAddNodeAfterControllerStart.java | 6 +- .../TestAddStateModelFactoryAfterConnect.java | 5 +- .../TestAlertingRebalancerFailure.java | 39 +- .../helix/integration/TestBasicSpectator.java | 4 +- .../integration/TestBatchMessageHandling.java | 6 +- .../integration/TestBucketizedResource.java | 4 +- .../integration/TestCMWithFailParticipant.java | 4 +- .../integration/TestCarryOverBadCurState.java | 6 +- .../integration/TestCleanupExternalView.java | 5 +- .../helix/integration/TestClusterStartsup.java | 22 +- .../TestCorrectnessOnConnectivityLoss.java | 12 +- .../apache/helix/integration/TestDisable.java | 4 +- .../integration/TestDisableExternalView.java | 4 +- .../helix/integration/TestDisableResource.java | 2 + .../integration/TestDistributedCMMain.java | 4 +- .../TestDistributedClusterController.java | 4 +- .../apache/helix/integration/TestDriver.java | 5 +- .../org/apache/helix/integration/TestDrop.java | 4 +- .../helix/integration/TestDropResource.java | 8 +- .../integration/TestEnableCompression.java | 4 +- .../TestEnablePartitionDuringDisable.java | 4 +- .../integration/TestEntropyFreeNodeBounce.java | 1 + .../helix/integration/TestErrorPartition.java | 4 +- .../helix/integration/TestExpandCluster.java | 22 +- .../integration/TestExternalViewUpdates.java | 4 +- .../integration/TestHelixCustomCodeRunner.java | 6 +- .../helix/integration/TestHelixInstanceTag.java | 12 +- .../TestHelixUsingDifferentParams.java | 4 +- .../TestInvalidResourceRebalance.java | 1 + .../helix/integration/TestNullReplica.java | 5 +- .../TestPartitionLevelTransitionConstraint.java | 5 +- .../TestPartitionMovementThrottle.java | 36 +- .../helix/integration/TestPauseSignal.java | 4 +- .../TestRebalancerPersistAssignments.java | 43 +- .../TestReelectedPipelineCorrectness.java | 1 + .../helix/integration/TestRenamePartition.java | 4 +- .../helix/integration/TestResetInstance.java | 4 +- .../integration/TestResetPartitionState.java | 5 +- .../helix/integration/TestResetResource.java | 5 +- .../integration/TestResourceGroupEndtoEnd.java | 6 +- .../TestResourceWithSamePartitionKey.java | 1 + .../helix/integration/TestSchemataSM.java | 4 +- .../TestSessionExpiryInTransition.java | 4 +- .../TestStandAloneCMSessionExpiry.java | 4 +- ...estStartMultipleControllersWithSameName.java | 5 +- .../TestStateTransitionCancellation.java | 23 +- .../TestStateTransitionThrottle.java | 8 +- .../helix/integration/TestStatusUpdate.java | 4 +- .../helix/integration/TestSwapInstance.java | 13 +- .../TestSyncSessionToController.java | 4 +- .../TestWeightBasedRebalanceUtil.java | 16 +- .../helix/integration/TestZkConnectionLost.java | 43 +- .../integration/common/IntegrationTest.java | 38 - .../common/ZkIntegrationTestBase.java | 322 --------- .../common/ZkStandAloneCMTestBase.java | 36 +- .../TestClusterDataCacheSelectiveUpdate.java | 12 +- .../controller/TestClusterMaintenanceMode.java | 17 +- .../TestControllerLeadershipChange.java | 4 +- .../controller/TestControllerLiveLock.java | 1 + .../TestSkipBestPossibleCalculation.java | 4 +- .../TestDistributedControllerManager.java | 4 +- .../manager/TestHelixDataAccessor.java | 7 +- .../manager/TestParticipantManager.java | 4 +- .../integration/messaging/TestBatchMessage.java | 15 +- .../messaging/TestBatchMessageWrapper.java | 1 + .../messaging/TestMessageThrottle.java | 4 +- .../messaging/TestMessageThrottle2.java | 4 +- .../messaging/TestP2PMessageSemiAuto.java | 19 +- .../paticipant/TestInstanceAutoJoin.java | 4 +- .../paticipant/TestNonOfflineInitState.java | 21 +- .../paticipant/TestRestartParticipant.java | 5 +- .../paticipant/TestStateTransitionTimeout.java | 18 +- .../TestStateTransitionTimeoutWithResource.java | 30 +- .../TestCrushAutoRebalance.java | 81 +-- .../TestCrushAutoRebalanceNonRack.java | 73 +- ...rushAutoRebalanceTopoplogyAwareDisabled.java | 20 +- .../TestDelayedAutoRebalance.java | 48 +- ...elayedAutoRebalanceWithDisabledInstance.java | 54 +- .../TestDelayedAutoRebalanceWithRackaware.java | 11 +- .../PartitionMigration/TestExpandCluster.java | 6 +- .../TestFullAutoMigration.java | 4 +- .../TestPartitionMigrationBase.java | 43 +- .../rebalancer/TestAutoIsWithEmptyMap.java | 5 +- .../rebalancer/TestAutoRebalance.java | 30 +- .../TestAutoRebalancePartitionLimit.java | 16 +- .../TestAutoRebalanceWithDisabledInstance.java | 18 +- ...MaintenanceModeWhenReachingMaxPartition.java | 9 +- ...ceModeWhenReachingOfflineInstancesLimit.java | 10 +- .../rebalancer/TestCustomIdealState.java | 4 +- .../TestCustomizedIdealStateRebalancer.java | 8 +- .../rebalancer/TestFullAutoNodeTagging.java | 9 + .../rebalancer/TestMixedModeAutoRebalance.java | 8 +- .../rebalancer/TestSemiAutoRebalance.java | 24 +- .../rebalancer/TestZeroReplicaAvoidance.java | 19 +- .../spectator/TestRoutingTableProvider.java | 4 +- ...stRoutingTableProviderFromCurrentStates.java | 47 +- .../TestRoutingTableProviderFromTargetEV.java | 36 +- ...TestRoutingTableProviderPeriodicRefresh.java | 4 +- .../spectator/TestRoutingTableSnapshot.java | 26 +- .../helix/integration/task/TaskTestBase.java | 11 +- .../integration/task/TestBatchAddJobs.java | 13 +- .../task/TestIndependentTaskRebalancer.java | 10 +- .../helix/integration/task/TestJobFailure.java | 10 +- .../task/TestJobFailureDependence.java | 8 +- .../task/TestJobFailureHighThreshold.java | 9 +- .../task/TestJobFailureTaskNotStarted.java | 20 +- .../helix/integration/task/TestJobTimeout.java | 9 +- .../task/TestJobTimeoutTaskNotStarted.java | 10 +- .../task/TestRebalanceRunningTask.java | 10 +- .../task/TestRunJobsWithMissingTarget.java | 4 +- .../integration/task/TestTaskAssignment.java | 2 +- .../task/TestTaskRebalancerParallel.java | 4 - .../integration/task/TestTaskThreadLeak.java | 4 +- .../integration/task/TestTaskThrottling.java | 6 +- .../task/TestTaskWithInstanceDisabled.java | 2 +- .../integration/task/TestUserContentStore.java | 9 +- .../helix/manager/zk/TestHandleNewSession.java | 5 +- .../manager/zk/TestZkBaseDataAccessor.java | 91 +-- .../zk/TestZkManagerFlappingDetection.java | 4 +- .../handling/TestBatchMessageModeConfigs.java | 14 +- .../handling/TestResourceThreadpoolSize.java | 24 +- .../TestClusterStatusMonitorLifecycle.java | 23 +- .../mbeans/TestClusterAggregateMetrics.java | 4 +- .../mbeans/TestDisableResourceMbean.java | 1 + .../mbeans/TestDropResourceMetricsReset.java | 1 + .../mbeans/TestResetClusterMetrics.java | 1 + .../participant/TestDistControllerElection.java | 16 +- .../TestDistControllerStateModel.java | 8 + .../helix/spectator/TestRoutingDataCache.java | 14 +- .../helix/task/TaskSynchronizedTestBase.java | 66 +- ...signableInstanceManagerControllerSwitch.java | 2 +- .../helix/task/TestJobStateOnCreation.java | 8 +- .../helix/task/TestSemiAutoStateTransition.java | 12 +- .../apache/helix/tools/TestClusterSetup.java | 11 +- .../apache/helix/tools/TestHelixAdminCli.java | 4 +- 142 files changed, 1561 insertions(+), 1719 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java index 4abfd07..fa5d29f 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java @@ -82,18 +82,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { _expectLiveInstances = expectLiveInstances; _clusterDataCache = new ClusterDataCache(); } - - public static void main (String [] args) { - Set<String> resources = Collections.singleton("SyncColoTestDB"); - BestPossibleExternalViewVerifier verifier = - new BestPossibleExternalViewVerifier.Builder("ESPRESSO_MT1") - .setZkAddr("zk-ltx1-espresso.stg.linkedin.com:12913") - .setResources(resources) - .build(); - - verifier.verify(); - } - + public static class Builder { private String _clusterName; private Map<String, Map<String, String>> _errStates; http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java index 483f0af..5757b24 100644 --- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java @@ -19,445 +19,15 @@ package org.apache.helix; * under the License. */ -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.I0Itec.zkclient.IZkStateListener; -import org.I0Itec.zkclient.ZkConnection; -import org.I0Itec.zkclient.ZkServer; -import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; -import org.apache.helix.controller.pipeline.Pipeline; -import org.apache.helix.controller.pipeline.Stage; -import org.apache.helix.controller.pipeline.StageContext; -import org.apache.helix.controller.stages.ClusterEvent; -import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.model.CurrentState; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.Message; -import org.apache.helix.model.Message.Attributes; -import org.apache.helix.model.Message.MessageType; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier; -import org.apache.helix.tools.StateModelConfigGenerator; -import org.apache.helix.util.ZKClientPool; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.AssertJUnit; -import org.testng.annotations.AfterSuite; -import org.testng.annotations.BeforeSuite; +import org.apache.helix.common.ZkTestBase; // TODO merge code with ZkIntegrationTestBase -public class ZkUnitTestBase { - private static Logger LOG = LoggerFactory.getLogger(ZkUnitTestBase.class); - protected static ZkServer _zkServer = null; - protected static ZkClient _gZkClient; - - public static final String ZK_ADDR = "localhost:2185"; - protected static final String CLUSTER_PREFIX = "CLUSTER"; - protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER"; - - @BeforeSuite(alwaysRun = true) - public void beforeSuite() throws Exception { - // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends - System.setProperty("zookeeper.4lw.commands.whitelist", "*"); - System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, "3000"); - - _zkServer = TestHelper.startZkServer(ZK_ADDR); - AssertJUnit.assertTrue(_zkServer != null); - ZKClientPool.reset(); - - // System.out.println("Number of open zkClient before ZkUnitTests: " - // + ZkClient.getNumberOfConnections()); - - _gZkClient = new ZkClient(ZK_ADDR); - _gZkClient.setZkSerializer(new ZNRecordSerializer()); - } - - @AfterSuite(alwaysRun = true) - public void afterSuite() { - _gZkClient.close(); - TestHelper.stopZkServer(_zkServer); - _zkServer = null; - - // System.out.println("Number of open zkClient after ZkUnitTests: " - // + ZkClient.getNumberOfConnections()); - - } - - protected String getShortClassName() { - String className = this.getClass().getName(); - return className.substring(className.lastIndexOf('.') + 1); - } - - protected String getCurrentLeader(ZkClient zkClient, String clusterName) { - String leaderPath = PropertyPathBuilder.controllerLeader(clusterName); - ZNRecord leaderRecord = zkClient.<ZNRecord> readData(leaderPath); - if (leaderRecord == null) { - return null; - } - - String leader = leaderRecord.getSimpleField(PropertyType.LEADER.toString()); - return leader; - } - - protected void stopCurrentLeader(ZkClient zkClient, String clusterName, - Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) { - String leader = getCurrentLeader(zkClient, clusterName); - Assert.assertTrue(leader != null); - System.out.println("stop leader:" + leader + " in " + clusterName); - Assert.assertTrue(leader != null); - - HelixManager manager = managerMap.remove(leader); - Assert.assertTrue(manager != null); - manager.disconnect(); - - Thread thread = threadMap.remove(leader); - Assert.assertTrue(thread != null); - thread.interrupt(); - - boolean isNewLeaderElected = false; - try { - // Thread.sleep(2000); - for (int i = 0; i < 5; i++) { - Thread.sleep(1000); - String newLeader = getCurrentLeader(zkClient, clusterName); - if (!newLeader.equals(leader)) { - isNewLeaderElected = true; - System.out.println("new leader elected: " + newLeader + " in " + clusterName); - break; - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - if (isNewLeaderElected == false) { - System.out.println("fail to elect a new leader elected in " + clusterName); - } - AssertJUnit.assertTrue(isNewLeaderElected); - } - - public void verifyInstance(ZkClient zkClient, String clusterName, String instance, - boolean wantExists) { - // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName); - String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName); - String instanceConfigPath = instanceConfigsPath + "/" + instance; - String instancePath = PropertyPathBuilder.instance(clusterName, instance); - AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath)); - AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath)); - } - - public void verifyResource(ZkClient zkClient, String clusterName, String resource, - boolean wantExists) { - String resourcePath = PropertyPathBuilder.idealState(clusterName, resource); - AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath)); - } - - public void verifyEnabled(ZkClient zkClient, String clusterName, String instance, - boolean wantEnabled) { - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); - Builder keyBuilder = accessor.keyBuilder(); - - InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance)); - AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled()); - } - - public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) { - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); - Builder keyBuilder = accessor.keyBuilder(); - - IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource)); - for (String partitionName : idealState.getPartitionSet()) { - if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) { - AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size()); - } else if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) { - AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName).size()); - } - } - } - - protected void simulateSessionExpiry(ZkConnection zkConnection) throws IOException, - InterruptedException { - ZooKeeper oldZookeeper = zkConnection.getZookeeper(); - LOG.info("Old sessionId = " + oldZookeeper.getSessionId()); - - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - LOG.info("In New connection, process event:" + event); - } - }; - - ZooKeeper newZookeeper = - new ZooKeeper(zkConnection.getServers(), oldZookeeper.getSessionTimeout(), watcher, - oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd()); - LOG.info("New sessionId = " + newZookeeper.getSessionId()); - // Thread.sleep(3000); - newZookeeper.close(); - Thread.sleep(10000); - oldZookeeper = zkConnection.getZookeeper(); - LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId()); - } - - protected void simulateSessionExpiry(ZkClient zkClient) throws IOException, InterruptedException { - IZkStateListener listener = new IZkStateListener() { - @Override - public void handleStateChanged(KeeperState state) throws Exception { - LOG.info("In Old connection, state changed:" + state); - } - - @Override - public void handleNewSession() throws Exception { - LOG.info("In Old connection, new session"); - } - - @Override - public void handleSessionEstablishmentError(Throwable var1) throws Exception { - } - }; - zkClient.subscribeStateChanges(listener); - ZkConnection connection = ((ZkConnection) zkClient.getConnection()); - ZooKeeper oldZookeeper = connection.getZookeeper(); - LOG.info("Old sessionId = " + oldZookeeper.getSessionId()); - - Watcher watcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - LOG.info("In New connection, process event:" + event); - } - }; - - ZooKeeper newZookeeper = - new ZooKeeper(connection.getServers(), oldZookeeper.getSessionTimeout(), watcher, - oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd()); - LOG.info("New sessionId = " + newZookeeper.getSessionId()); - // Thread.sleep(3000); - newZookeeper.close(); - Thread.sleep(10000); - connection = (ZkConnection) zkClient.getConnection(); - oldZookeeper = connection.getZookeeper(); - LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId()); - } - - protected void setupStateModel(String clusterName) { - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); - Builder keyBuilder = accessor.keyBuilder(); - - StateModelDefinition masterSlave = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); - accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave); - - StateModelDefinition leaderStandby = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby()); - accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby); - - StateModelDefinition onlineOffline = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()); - accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline); - - } - - protected List<IdealState> setupIdealState(String clusterName, int[] nodes, String[] resources, - int partitions, int replicas) { - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); - Builder keyBuilder = accessor.keyBuilder(); - - List<IdealState> idealStates = new ArrayList<IdealState>(); - List<String> instances = new ArrayList<String>(); - for (int i : nodes) { - instances.add("localhost_" + i); - } - - for (String resourceName : resources) { - IdealState idealState = new IdealState(resourceName); - for (int p = 0; p < partitions; p++) { - List<String> value = new ArrayList<String>(); - for (int r = 0; r < replicas; r++) { - int n = nodes[(p + r) % nodes.length]; - value.add("localhost_" + n); - } - idealState.getRecord().setListField(resourceName + "_" + p, value); - } - - idealState.setReplicas(Integer.toString(replicas)); - idealState.setStateModelDefRef("MasterSlave"); - idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO); - idealState.setNumPartitions(partitions); - idealStates.add(idealState); - - // System.out.println(idealState); - accessor.setProperty(keyBuilder.idealStates(resourceName), idealState); - } - return idealStates; - } - - protected void setupLiveInstances(String clusterName, int[] liveInstances) { - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); - Builder keyBuilder = accessor.keyBuilder(); - - for (int i = 0; i < liveInstances.length; i++) { - String instance = "localhost_" + liveInstances[i]; - LiveInstance liveInstance = new LiveInstance(instance); - liveInstance.setSessionId("session_" + liveInstances[i]); - liveInstance.setHelixVersion("0.0.0"); - accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance); - } - } - - protected void setupInstances(String clusterName, int[] instances) { - HelixAdmin admin = new ZKHelixAdmin(_gZkClient); - for (int i = 0; i < instances.length; i++) { - String instance = "localhost_" + instances[i]; - InstanceConfig instanceConfig = new InstanceConfig(instance); - instanceConfig.setHostName("localhost"); - instanceConfig.setPort("" + instances[i]); - instanceConfig.setInstanceEnabled(true); - admin.addInstance(clusterName, instanceConfig); - } - } - - protected void runPipeline(ClusterEvent event, Pipeline pipeline) { - try { - pipeline.handle(event); - pipeline.finish(); - } catch (Exception e) { - LOG.error("Exception while executing pipeline:" + pipeline - + ". Will not continue to next pipeline", e); - } - } - - protected void runStage(ClusterEvent event, Stage stage) throws Exception { - StageContext context = new StageContext(); - stage.init(context); - stage.preProcess(); - - // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in - // execute() function call - // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving runStage() - // to a shared library - if (stage instanceof AbstractAsyncBaseStage) { - ((AbstractAsyncBaseStage) stage).execute(event); - } else { - stage.process(event); - } - stage.postProcess(); - } - - protected Message createMessage(MessageType type, String msgId, String fromState, String toState, - String resourceName, String tgtName) { - Message msg = new Message(type.toString(), msgId); - msg.setFromState(fromState); - msg.setToState(toState); - msg.getRecord().setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName); - msg.setTgtName(tgtName); - return msg; - } - - /** - * Poll for the existence (or lack thereof) of a specific Helix property - * @param clazz the HelixProeprty subclass - * @param accessor connected HelixDataAccessor - * @param key the property key to look up - * @param shouldExist true if the property should exist, false otherwise - * @return the property if found, or null if it does not exist - */ - protected <T extends HelixProperty> T pollForProperty(Class<T> clazz, HelixDataAccessor accessor, - PropertyKey key, boolean shouldExist) throws InterruptedException { - final int POLL_TIMEOUT = 5000; - final int POLL_INTERVAL = 50; - T property = accessor.getProperty(key); - int timeWaited = 0; - while (((shouldExist && property == null) || (!shouldExist && property != null)) - && timeWaited < POLL_TIMEOUT) { - Thread.sleep(POLL_INTERVAL); - timeWaited += POLL_INTERVAL; - property = accessor.getProperty(key); - } - return property; - } - - /** - * Ensures that external view and current state are empty - */ - protected static class EmptyZkVerifier implements ZkVerifier { - private final String _clusterName; - private final String _resourceName; - private final ZkClient _zkClient; - - /** - * Instantiate the verifier - * @param clusterName the cluster to verify - * @param resourceName the resource to verify - */ - public EmptyZkVerifier(String clusterName, String resourceName) { - _clusterName = clusterName; - _resourceName = resourceName; - _zkClient = ZKClientPool.getZkClient(ZK_ADDR); - } - - @Override - public boolean verify() { - BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); - HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName)); - - // verify external view empty - if (externalView != null) { - for (String partition : externalView.getPartitionSet()) { - Map<String, String> stateMap = externalView.getStateMap(partition); - if (stateMap != null && !stateMap.isEmpty()) { - LOG.error("External view not empty for " + partition); - return false; - } - } - } - - // verify current state empty - List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances()); - for (String participant : liveParticipants) { - List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant)); - for (String sessionId : sessionIds) { - CurrentState currentState = - accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName)); - Map<String, String> partitionStateMap = currentState.getPartitionStateMap(); - if (partitionStateMap != null && !partitionStateMap.isEmpty()) { - LOG.error("Current state not empty for " + participant); - return false; - } - } - } - return true; - } - - @Override - public ZkClient getZkClient() { - return _zkClient; - } +public class ZkUnitTestBase extends ZkTestBase { - @Override - public String getClusterName() { - return _clusterName; + protected void deleteCluster(String clusterName) { + String namespace = "/" + clusterName; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursively(namespace); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java new file mode 100644 index 0000000..7d98119 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -0,0 +1,691 @@ +package org.apache.helix.common; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import org.I0Itec.zkclient.IZkStateListener; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.ZkServer; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; +import org.apache.helix.controller.pipeline.Pipeline; +import org.apache.helix.controller.pipeline.Stage; +import org.apache.helix.controller.pipeline.StageContext; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.stages.AttributeName; +import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ConfigScope; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.OnlineOfflineSMD; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.model.builder.ConfigScopeBuilder; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.StateModelConfigGenerator; +import org.apache.helix.util.ZKClientPool; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.AssertJUnit; +import org.testng.ITestContext; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeSuite; + +public class ZkTestBase { + private static Logger LOG = LoggerFactory.getLogger(ZkTestBase.class); + + protected static ZkServer _zkServer; + protected static ZkClient _gZkClient; + protected static ClusterSetup _gSetupTool; + protected static BaseDataAccessor<ZNRecord> _baseAccessor; + + public static final String ZK_ADDR = "localhost:2183"; + protected static final String CLUSTER_PREFIX = "CLUSTER"; + protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER"; + protected final String CONTROLLER_PREFIX = "controller"; + protected final String PARTICIPANT_PREFIX = "localhost"; + + + @BeforeSuite + public void beforeSuite() throws Exception { + // TODO: use logging.properties file to config java.util.logging.Logger levels + java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger(""); + topJavaLogger.setLevel(Level.WARNING); + + // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, "3000"); + + _zkServer = TestHelper.startZkServer(ZK_ADDR); + AssertJUnit.assertTrue(_zkServer != null); + ZKClientPool.reset(); + + _gZkClient = new ZkClient(ZK_ADDR); + _gZkClient.setZkSerializer(new ZNRecordSerializer()); + _gSetupTool = new ClusterSetup(_gZkClient); + _baseAccessor = new ZkBaseDataAccessor<>(_gZkClient); + } + + @AfterSuite + public void afterSuite() { + ZKClientPool.reset(); + _gZkClient.close(); + TestHelper.stopZkServer(_zkServer); + } + + @BeforeMethod + public void beforeTest(Method testMethod, ITestContext testContext){ + long startTime = System.currentTimeMillis(); + System.out.println("START " + testMethod.getName() + " at " + new Date(startTime)); + testContext.setAttribute("StartTime", System.currentTimeMillis()); + } + + @AfterMethod + public void endTest(Method testMethod, ITestContext testContext) { + Long startTime = (Long) testContext.getAttribute("StartTime"); + long endTime = System.currentTimeMillis(); + System.out.println( + "END " + testMethod.getName() + " at " + new Date(endTime) + ", took: " + (endTime + - startTime) + "ms."); + } + + + protected String getShortClassName() { + return this.getClass().getSimpleName(); + } + + protected String getCurrentLeader(ZkClient zkClient, String clusterName) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader()); + if (leader == null) { + return null; + } + return leader.getInstanceName(); + } + + protected void stopCurrentLeader(ZkClient zkClient, String clusterName, + Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) { + String leader = getCurrentLeader(zkClient, clusterName); + Assert.assertTrue(leader != null); + System.out.println("stop leader:" + leader + " in " + clusterName); + Assert.assertTrue(leader != null); + + HelixManager manager = managerMap.remove(leader); + Assert.assertTrue(manager != null); + manager.disconnect(); + + Thread thread = threadMap.remove(leader); + Assert.assertTrue(thread != null); + thread.interrupt(); + + boolean isNewLeaderElected = false; + try { + // Thread.sleep(2000); + for (int i = 0; i < 5; i++) { + Thread.sleep(1000); + String newLeader = getCurrentLeader(zkClient, clusterName); + if (!newLeader.equals(leader)) { + isNewLeaderElected = true; + System.out.println("new leader elected: " + newLeader + " in " + clusterName); + break; + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (isNewLeaderElected == false) { + System.out.println("fail to elect a new leader elected in " + clusterName); + } + AssertJUnit.assertTrue(isNewLeaderElected); + } + + protected void enableHealthCheck(String clusterName) { + ConfigScope scope = new ConfigScopeBuilder().forCluster(clusterName).build(); + new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true); + } + + protected void enablePersistBestPossibleAssignment(ZkClient zkClient, String clusterName, + Boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setPersistBestPossibleAssignment(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enablePersistIntermediateAssignment(ZkClient zkClient, String clusterName, + Boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setPersistIntermediateAssignment(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enableTopologyAwareRebalance(ZkClient zkClient, String clusterName, + Boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setTopologyAwareEnabled(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enableDelayRebalanceInCluster(ZkClient zkClient, String clusterName, + boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setDelayRebalaceEnabled(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enableDelayRebalanceInInstance(ZkClient zkClient, String clusterName, + String instanceName, boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName); + instanceConfig.setDelayRebalanceEnabled(enabled); + configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig); + } + + protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, long delay) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setRebalanceDelayTime(delay); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { + return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica, + minActiveReplica, delay, AutoRebalanceStrategy.class.getName()); + } + + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, long delay, + String rebalanceStrategy) { + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); + if (idealState == null) { + _gSetupTool.addResourceToCluster(clusterName, db, numPartition, stateModel, + IdealState.RebalanceMode.FULL_AUTO + "", rebalanceStrategy); + } + + idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); + idealState.setMinActiveReplicas(minActiveReplica); + if (!idealState.isDelayRebalanceEnabled()) { + idealState.setDelayRebalanceEnabled(true); + } + if (delay > 0) { + idealState.setRebalanceDelay(delay); + } + idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName()); + _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, db, idealState); + _gSetupTool.rebalanceStorageCluster(clusterName, db, replica); + idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); + + return idealState; + } + + protected IdealState createIdealState(String resourceGroupName, String instanceGroupTag, + List<String> instanceNames, int numPartition, int replica, String rebalanceMode, + String stateModelDef) { + IdealState is = _gSetupTool + .createIdealStateForResourceGroup(resourceGroupName, instanceGroupTag, numPartition, + replica, rebalanceMode, stateModelDef); + + // setup initial partition->instance mapping. + int nodeIdx = 0; + int numNode = instanceNames.size(); + assert (numNode >= replica); + for (int i = 0; i < numPartition; i++) { + String partitionName = resourceGroupName + "_" + i; + for (int j = 0; j < replica; j++) { + is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % numNode), + OnlineOfflineSMD.States.ONLINE.toString()); + } + nodeIdx++; + } + + return is; + } + + protected void createDBInSemiAuto(ClusterSetup clusterSetup, String clusterName, String dbName, + List<String> preferenceList, String stateModelDef, int numPartition, int replica) { + clusterSetup.addResourceToCluster(clusterName, dbName, numPartition, stateModelDef, + IdealState.RebalanceMode.SEMI_AUTO.toString()); + clusterSetup.rebalanceStorageCluster(clusterName, dbName, replica); + + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, dbName); + for (String p : is.getPartitionSet()) { + is.setPreferenceList(p, preferenceList); + } + clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, dbName, is); + } + + /** + * Validate there should be always minimal active replica and top state replica for each partition. + * Also make sure there is always some partitions with only active replica count. + */ + protected void validateMinActiveAndTopStateReplica(IdealState is, ExternalView ev, + int minActiveReplica, int numNodes) { + StateModelDefinition stateModelDef = + BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition(); + String topState = stateModelDef.getStatesPriorityList().get(0); + int replica = Integer.valueOf(is.getReplicas()); + + Map<String, Integer> stateCount = + stateModelDef.getStateCountMap(numNodes, replica); + Set<String> activeStates = stateCount.keySet(); + + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); + Assert.assertNotNull(assignmentMap, + is.getResourceName() + "'s best possible assignment is null for partition " + partition); + Assert.assertTrue(!assignmentMap.isEmpty(), + is.getResourceName() + "'s partition " + partition + " has no best possible map in IS."); + + boolean hasTopState = false; + int activeReplica = 0; + for (String state : assignmentMap.values()) { + if (topState.equalsIgnoreCase(state)) { + hasTopState = true; + } + if (activeStates.contains(state)) { + activeReplica++; + } + } + + if (activeReplica < minActiveReplica) { + int a = 0; + } + + Assert.assertTrue(hasTopState, String.format("%s missing %s replica", partition, topState)); + Assert.assertTrue(activeReplica >= minActiveReplica, String + .format("%s has less active replica %d then required %d", partition, activeReplica, + minActiveReplica)); + } + } + + protected void runStage(HelixManager manager, ClusterEvent event, Stage stage) throws Exception { + event.addAttribute(AttributeName.helixmanager.name(), manager); + StageContext context = new StageContext(); + stage.init(context); + stage.preProcess(); + + // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in + // execute() function call + if (stage instanceof AbstractAsyncBaseStage) { + ((AbstractAsyncBaseStage) stage).execute(event); + } else { + stage.process(event); + } + stage.postProcess(); + } + + public void verifyInstance(ZkClient zkClient, String clusterName, String instance, + boolean wantExists) { + // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName); + String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName); + String instanceConfigPath = instanceConfigsPath + "/" + instance; + String instancePath = PropertyPathBuilder.instance(clusterName, instance); + AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath)); + AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath)); + } + + public void verifyResource(ZkClient zkClient, String clusterName, String resource, + boolean wantExists) { + String resourcePath = PropertyPathBuilder.idealState(clusterName, resource); + AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath)); + } + + public void verifyEnabled(ZkClient zkClient, String clusterName, String instance, + boolean wantEnabled) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance)); + AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled()); + } + + public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource)); + for (String partitionName : idealState.getPartitionSet()) { + if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO) { + AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size()); + } else if (idealState.getRebalanceMode() == IdealState.RebalanceMode.CUSTOMIZED) { + AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName).size()); + } + } + } + + protected void simulateSessionExpiry(ZkConnection zkConnection) throws IOException, + InterruptedException { + ZooKeeper oldZookeeper = zkConnection.getZookeeper(); + LOG.info("Old sessionId = " + oldZookeeper.getSessionId()); + + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + LOG.info("In New connection, process event:" + event); + } + }; + + ZooKeeper newZookeeper = + new ZooKeeper(zkConnection.getServers(), oldZookeeper.getSessionTimeout(), watcher, + oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd()); + LOG.info("New sessionId = " + newZookeeper.getSessionId()); + // Thread.sleep(3000); + newZookeeper.close(); + Thread.sleep(10000); + oldZookeeper = zkConnection.getZookeeper(); + LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId()); + } + + protected void simulateSessionExpiry(ZkClient zkClient) + throws IOException, InterruptedException, IOException { + IZkStateListener listener = new IZkStateListener() { + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { + LOG.info("In Old connection, state changed:" + state); + } + + @Override + public void handleNewSession() throws Exception { + LOG.info("In Old connection, new session"); + } + + @Override + public void handleSessionEstablishmentError(Throwable var1) throws Exception { + } + }; + zkClient.subscribeStateChanges(listener); + ZkConnection connection = ((ZkConnection) zkClient.getConnection()); + ZooKeeper oldZookeeper = connection.getZookeeper(); + LOG.info("Old sessionId = " + oldZookeeper.getSessionId()); + + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + LOG.info("In New connection, process event:" + event); + } + }; + + ZooKeeper newZookeeper = + new ZooKeeper(connection.getServers(), oldZookeeper.getSessionTimeout(), watcher, + oldZookeeper.getSessionId(), oldZookeeper.getSessionPasswd()); + LOG.info("New sessionId = " + newZookeeper.getSessionId()); + // Thread.sleep(3000); + newZookeeper.close(); + Thread.sleep(10000); + connection = (ZkConnection) zkClient.getConnection(); + oldZookeeper = connection.getZookeeper(); + LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId()); + } + + protected void setupStateModel(String clusterName) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + StateModelDefinition masterSlave = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); + accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave); + + StateModelDefinition leaderStandby = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby()); + accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby); + + StateModelDefinition onlineOffline = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()); + accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline); + + } + + protected Message createMessage(Message.MessageType type, String msgId, String fromState, String toState, + String resourceName, String tgtName) { + Message msg = new Message(type.toString(), msgId); + msg.setFromState(fromState); + msg.setToState(toState); + msg.getRecord().setSimpleField(Message.Attributes.RESOURCE_NAME.toString(), resourceName); + msg.setTgtName(tgtName); + return msg; + } + + protected List<IdealState> setupIdealState(String clusterName, int[] nodes, String[] resources, + int partitions, int replicas) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + List<IdealState> idealStates = new ArrayList<IdealState>(); + List<String> instances = new ArrayList<String>(); + for (int i : nodes) { + instances.add("localhost_" + i); + } + + for (String resourceName : resources) { + IdealState idealState = new IdealState(resourceName); + for (int p = 0; p < partitions; p++) { + List<String> value = new ArrayList<String>(); + for (int r = 0; r < replicas; r++) { + int n = nodes[(p + r) % nodes.length]; + value.add("localhost_" + n); + } + idealState.getRecord().setListField(resourceName + "_" + p, value); + } + + idealState.setReplicas(Integer.toString(replicas)); + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO); + idealState.setNumPartitions(partitions); + idealStates.add(idealState); + + // System.out.println(idealState); + accessor.setProperty(keyBuilder.idealStates(resourceName), idealState); + } + return idealStates; + } + + protected void setupLiveInstances(String clusterName, int[] liveInstances) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + for (int i = 0; i < liveInstances.length; i++) { + String instance = "localhost_" + liveInstances[i]; + LiveInstance liveInstance = new LiveInstance(instance); + liveInstance.setSessionId("session_" + liveInstances[i]); + liveInstance.setHelixVersion("0.0.0"); + accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance); + } + } + + protected void setupInstances(String clusterName, int[] instances) { + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + for (int i = 0; i < instances.length; i++) { + String instance = "localhost_" + instances[i]; + InstanceConfig instanceConfig = new InstanceConfig(instance); + instanceConfig.setHostName("localhost"); + instanceConfig.setPort("" + instances[i]); + instanceConfig.setInstanceEnabled(true); + admin.addInstance(clusterName, instanceConfig); + } + } + + protected void runPipeline(ClusterEvent event, Pipeline pipeline) { + try { + pipeline.handle(event); + pipeline.finish(); + } catch (Exception e) { + LOG.error("Exception while executing pipeline:" + pipeline + + ". Will not continue to next pipeline", e); + } + } + + protected void runStage(ClusterEvent event, Stage stage) throws Exception { + StageContext context = new StageContext(); + stage.init(context); + stage.preProcess(); + + // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in + // execute() function call + // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving runStage() + // to a shared library + if (stage instanceof AbstractAsyncBaseStage) { + ((AbstractAsyncBaseStage) stage).execute(event); + } else { + stage.process(event); + } + stage.postProcess(); + } + + /** + * Poll for the existence (or lack thereof) of a specific Helix property + * @param clazz the HelixProeprty subclass + * @param accessor connected HelixDataAccessor + * @param key the property key to look up + * @param shouldExist true if the property should exist, false otherwise + * @return the property if found, or null if it does not exist + */ + protected <T extends HelixProperty> T pollForProperty(Class<T> clazz, HelixDataAccessor accessor, + PropertyKey key, boolean shouldExist) throws InterruptedException { + final int POLL_TIMEOUT = 5000; + final int POLL_INTERVAL = 50; + T property = accessor.getProperty(key); + int timeWaited = 0; + while (((shouldExist && property == null) || (!shouldExist && property != null)) + && timeWaited < POLL_TIMEOUT) { + Thread.sleep(POLL_INTERVAL); + timeWaited += POLL_INTERVAL; + property = accessor.getProperty(key); + } + return property; + } + + /** + * Ensures that external view and current state are empty + */ + protected static class EmptyZkVerifier implements ClusterStateVerifier.ZkVerifier { + private final String _clusterName; + private final String _resourceName; + private final ZkClient _zkClient; + + /** + * Instantiate the verifier + * @param clusterName the cluster to verify + * @param resourceName the resource to verify + */ + public EmptyZkVerifier(String clusterName, String resourceName) { + _clusterName = clusterName; + _resourceName = resourceName; + _zkClient = ZKClientPool.getZkClient(ZK_ADDR); + } + + @Override + public boolean verify() { + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); + HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName)); + + // verify external view empty + if (externalView != null) { + for (String partition : externalView.getPartitionSet()) { + Map<String, String> stateMap = externalView.getStateMap(partition); + if (stateMap != null && !stateMap.isEmpty()) { + LOG.error("External view not empty for " + partition); + return false; + } + } + } + + // verify current state empty + List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances()); + for (String participant : liveParticipants) { + List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant)); + for (String sessionId : sessionIds) { + CurrentState currentState = + accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName)); + Map<String, String> partitionStateMap = currentState.getPartitionStateMap(); + if (partitionStateMap != null && !partitionStateMap.isEmpty()) { + LOG.error("Current state not empty for " + participant); + return false; + } + } + } + return true; + } + + @Override + public ZkClient getZkClient() { + return _zkClient; + } + + @Override + public String getClusterName() { + return _clusterName; + } + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java index 9054a1d..ab62676 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java @@ -119,8 +119,8 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0")).size(), 1); + deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); - } @Test() @@ -310,8 +310,8 @@ public class TestMessageThrottleStage extends ZkUnitTestBase { Assert.assertTrue(throttleMessages.contains(msg3)); Assert.assertTrue(throttleMessages.contains(msg4)); + deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); - } private boolean containsConstraint(Set<ConstraintItem> constraints, ConstraintItem constraint) { http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index d38907d..90363c1 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -118,8 +118,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase { messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node1"); + deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); - } @Test @@ -227,8 +227,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase { messages = accessor.getChildNames(keyBuilder.messages("localhost_0")); Assert.assertTrue(messages.isEmpty()); + deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); - } @Test @@ -323,6 +323,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { Assert.assertEquals(message.getToState(), "DROPPED"); Assert.assertEquals(message.getTgtName(), "localhost_0"); + deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } @@ -398,8 +399,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase { messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0")); Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0"); + deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); - } @Test @@ -464,6 +465,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { Assert.assertEquals(message.getToState(), "SLAVE"); Assert.assertEquals(message.getTgtName(), "localhost_1"); + deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java index 5701211..00509b3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java +++ b/helix-core/src/test/java/org/apache/helix/integration/SinglePartitionLeaderStandByTest.java @@ -25,7 +25,7 @@ import java.util.Date; import org.apache.helix.HelixConstants; import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; -import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; @@ -41,7 +41,7 @@ import org.testng.annotations.Test; * which helps us write integration tests easily */ -public class SinglePartitionLeaderStandByTest extends ZkIntegrationTestBase { +public class SinglePartitionLeaderStandByTest extends ZkTestBase { @Test public void test() throws Exception { @@ -99,6 +99,7 @@ public class SinglePartitionLeaderStandByTest extends ZkIntegrationTestBase { for (int i = 0; i < n; i++) { participants[i].syncStop(); } + TestHelper.dropCluster(clusterName, _gZkClient); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } } http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java index 8be880d..6aca358 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddClusterV2.java @@ -21,7 +21,7 @@ package org.apache.helix.integration; import java.util.Date; -import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterDistributedController; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.tools.ClusterSetup; @@ -33,7 +33,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public class TestAddClusterV2 extends ZkIntegrationTestBase { +public class TestAddClusterV2 extends ZkTestBase { private static Logger LOG = LoggerFactory.getLogger(TestAddClusterV2.class); protected static final int CLUSTER_NR = 10; @@ -54,18 +54,6 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase { public void beforeClass() throws Exception { System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - String namespace = "/" + CONTROLLER_CLUSTER; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursively(namespace); - } - - for (int i = 0; i < CLUSTER_NR; i++) { - namespace = "/" + CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursively(namespace); - } - } - _setupTool = new ClusterSetup(ZK_ADDR); // setup CONTROLLER_CLUSTER @@ -136,6 +124,13 @@ public class TestAddClusterV2 extends ZkIntegrationTestBase { for (int i = 0; i < NODE_NR; i++) { _participants[i].syncStop(); } + + // delete clusters + for (int i = 0; i < CLUSTER_NR; i++) { + String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i; + _setupTool.getClusterManagementTool().dropCluster(clusterName); + } + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java index 7e83be2..1d5273a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddNodeAfterControllerStart.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.TestHelper; import org.apache.helix.ZkTestHelper; -import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.ClusterDistributedController; import org.apache.helix.integration.manager.MockParticipantManager; @@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.Test; -public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase { +public class TestAddNodeAfterControllerStart extends ZkTestBase { private static Logger LOG = LoggerFactory.getLogger(TestAddNodeAfterControllerStart.class); final String className = getShortClassName(); @@ -88,6 +88,7 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase { for (int i = 0; i < nodeNr; i++) { participants[i].syncStop(); } + _gSetupTool.deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } @@ -164,6 +165,7 @@ public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase { for (int i = 0; i < nodeNr; i++) { participants[i].syncStop(); } + _gSetupTool.deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java index b05b9fb..6209815 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixDataAccessor; @@ -39,7 +39,7 @@ import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; import org.testng.Assert; import org.testng.annotations.Test; -public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase { +public class TestAddStateModelFactoryAfterConnect extends ZkTestBase { @Test public void testBasic() throws Exception { // Logger.getRootLogger().setLevel(Level.INFO); @@ -130,6 +130,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase for (int i = 0; i < 5; i++) { participants[i].syncStop(); } + _gSetupTool.deleteCluster(clusterName); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java index 946eb1c..cd53552 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java @@ -68,17 +68,12 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { public void beforeClass() throws Exception { System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursively(namespace); - } - _setupTool = new ClusterSetup(ZK_ADDR); // setup storage cluster - _setupTool.addCluster(CLUSTER_NAME, true); + _gSetupTool.addCluster(CLUSTER_NAME, true); for (int i = 0; i < NODE_NR; i++) { String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); - _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); } // start controller @@ -108,19 +103,19 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { @Test (enabled = false) public void testParticipantUnavailable() { - _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, + _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name()); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); HelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) .setResources(new HashSet<>(Collections.singleton(testDb))).build(); Assert.assertTrue(verifier.verify()); // disable then enable the resource to ensure no rebalancing error is generated during this process - _setupTool.dropResourceFromCluster(CLUSTER_NAME, testDb); - _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, testDb); + _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name()); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); Assert.assertTrue(verifier.verify()); // Verify there is no rebalance error logged @@ -137,7 +132,7 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { checkRebalanceFailureGauge(true); // clean up - _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); + _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); for (int i = 0; i < NODE_NR; i++) { _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[i].getInstanceName()); @@ -147,21 +142,21 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { @Test (enabled = false) public void testTagSetIncorrect() { - _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, + _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name()); // set expected instance tag IdealState is = - _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb); + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb); is.setInstanceGroupTag("RandomTag"); - _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, is); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, is); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); // Verify there is rebalance error logged Assert.assertNotNull(pollForError(accessor, errorNodeKey)); checkRebalanceFailureGauge(true); // clean up - _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); + _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); } @Test (enabled = false) @@ -188,10 +183,10 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { // Error may be recorded unexpectedly when a resource from other tests is not cleaned up. accessor.removeProperty(errorNodeKey); - _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, + _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name(), CrushRebalanceStrategy.class.getName()); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas); HelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) .setResources(new HashSet<>(Collections.singleton(testDb))).build(); @@ -212,13 +207,13 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase { for (int i = replicas; i < NODE_NR; i++) { setDomainId(_participants[i].getInstanceName(), configAccessor); } - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas); Thread.sleep(1000); // Verify that rebalance error state is removed checkRebalanceFailureGauge(false); // clean up - _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); + _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); clusterConfig.setTopologyAwareEnabled(false); } http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java index 0c61b27..2fa8c9f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java @@ -46,8 +46,8 @@ public class TestBasicSpectator extends ZkStandAloneCMTestBase implements relayHelixManager.connect(); relayHelixManager.addExternalViewChangeListener(this); - _setupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3); + _gSetupTool.addResourceToCluster(CLUSTER_NAME, "NextDB", 64, STATE_MODEL); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "NextDB", 3); boolean result = ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier( http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java index 3feb2e0..d7c3a2d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java @@ -54,15 +54,15 @@ public class TestBatchMessageHandling extends ZkStandAloneCMTestBase { IdealState idealState = new FullAutoModeISBuilder(dbName).setStateModel("OnlineOffline") .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build(); idealState.setBatchMessageMode(true); - _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, idealState); - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1); + _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, idealState); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1); Thread.sleep(1000L); int numOfOnlines = 0; int numOfErrors = 0; ExternalView externalView = - _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName); + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName); for (String partition : externalView.getPartitionSet()) { if (externalView.getStateMap(partition).values().contains("ONLINE")) { numOfOnlines++; http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java index 1b764d4..7c90e5f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java @@ -30,7 +30,7 @@ import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.NotificationContext.Type; -import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -45,7 +45,7 @@ import org.apache.helix.tools.DefaultIdealStateCalculator; import org.testng.Assert; import org.testng.annotations.Test; -public class TestBucketizedResource extends ZkIntegrationTestBase { +public class TestBucketizedResource extends ZkTestBase { private void setupCluster(String clusterName, List<String> instanceNames, String dbName, int replica, int partitions, int bucketSize) { http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java index 3c3f4ac..7dd477a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCMWithFailParticipant.java @@ -21,10 +21,10 @@ package org.apache.helix.integration; import java.util.Date; -import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.common.ZkTestBase; import org.testng.annotations.Test; -public class TestCMWithFailParticipant extends ZkIntegrationTestBase { +public class TestCMWithFailParticipant extends ZkTestBase { // ZkClient _zkClient; // // @BeforeClass () http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java index 05b3e99..e01afc9 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCarryOverBadCurState.java @@ -24,7 +24,7 @@ import java.util.Date; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; -import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.common.ZkTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.tools.ClusterStateVerifier; @@ -33,7 +33,7 @@ import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier; import org.testng.Assert; import org.testng.annotations.Test; -public class TestCarryOverBadCurState extends ZkIntegrationTestBase { +public class TestCarryOverBadCurState extends ZkTestBase { @Test public void testCarryOverBadCurState() throws Exception { System.out.println("START testCarryOverBadCurState at " + new Date(System.currentTimeMillis())); @@ -83,7 +83,7 @@ public class TestCarryOverBadCurState extends ZkIntegrationTestBase { for (int i = 0; i < 5; i++) { participants[i].syncStop(); } + _gSetupTool.deleteCluster(clusterName); System.out.println("END testCarryOverBadCurState at " + new Date(System.currentTimeMillis())); - } } http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java index e7a7ea0..61aadde 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCleanupExternalView.java @@ -98,8 +98,8 @@ public class TestCleanupExternalView extends ZkUnitTestBase { accessor.removeProperty(keyBuilder.currentState("localhost_12918", liveInstance.getSessionId(), "TestDB0")); liveInstance = accessor.getProperty(keyBuilder.liveInstance("localhost_12919")); - accessor.removeProperty(keyBuilder.currentState("localhost_12919", liveInstance.getSessionId(), - "TestDB0")); + accessor.removeProperty( + keyBuilder.currentState("localhost_12919", liveInstance.getSessionId(), "TestDB0")); // re-enable controller shall remove orphan external-view // System.out.println("re-enabling controller"); @@ -123,6 +123,7 @@ public class TestCleanupExternalView extends ZkUnitTestBase { for (int i = 0; i < n; i++) { participants[i].syncStop(); } + TestHelper.dropCluster(clusterName, _gZkClient); System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } http://git-wip-us.apache.org/repos/asf/helix/blob/c0d5792b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java index 21a9ac2..9d5ad3e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestClusterStartsup.java @@ -38,20 +38,14 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase { void setupCluster() throws HelixException { System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursively(namespace); - } - _setupTool = new ClusterSetup(ZK_ADDR); - // setup storage cluster - _setupTool.addCluster(CLUSTER_NAME, true); - _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 20, STATE_MODEL); + _gSetupTool.addCluster(CLUSTER_NAME, true); + _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 20, STATE_MODEL); for (int i = 0; i < NODE_NR; i++) { String storageNodeName = "localhost_" + (START_PORT + i); - _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); } - _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3); } @Override @@ -60,11 +54,6 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase { } - @Override - @AfterClass() - public void afterClass() { - } - @Test() public void testParticipantStartUp() throws Exception { setupCluster(); @@ -135,5 +124,8 @@ public class TestClusterStartsup extends ZkStandAloneCMTestBase { AssertJUnit.assertFalse(manager.isConnected()); } + if (manager != null) { + manager.disconnect(); + } } }
