Fix TestZkConnectionLost to use seperate zk server to avoid stuck other tests.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/aaa632f3 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/aaa632f3 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/aaa632f3 Branch: refs/heads/master Commit: aaa632f3d53a0268bfbf1b7ed18996cdc72894a4 Parents: 2049f93 Author: Lei Xia <[email protected]> Authored: Thu May 17 13:29:31 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Mon Jul 9 15:56:40 2018 -0700 ---------------------------------------------------------------------- .../helix/integration/TestZkConnectionLost.java | 22 ++++++++-------- .../helix/task/TaskSynchronizedTestBase.java | 27 +++++++++++++++++--- 2 files changed, 35 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/aaa632f3/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java index 6fb966e..3721b2c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java @@ -34,30 +34,33 @@ public class TestZkConnectionLost extends TaskTestBase { private final AtomicReference<ZkServer> _zkServerRef = new AtomicReference<>(); + private String _zkAddr = "localhost:2189"; + @BeforeClass public void beforeClass() throws Exception { + ZkServer zkServer = TestHelper.startZkServer(_zkAddr); + _zkServerRef.set(zkServer); + _participants = new MockParticipantManager[_numNodes]; String namespace = "/" + CLUSTER_NAME; if (_gZkClient.exists(namespace)) { _gZkClient.deleteRecursively(namespace); } - _setupTool = new ClusterSetup(ZK_ADDR); + _setupTool = new ClusterSetup(_zkAddr); _setupTool.addCluster(CLUSTER_NAME, true); setupParticipants(); setupDBs(); - createManagers(); + createManagers(_zkAddr, CLUSTER_NAME); // start controller String controllerName = CONTROLLER_PREFIX + "_0"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller = new ClusterControllerManager(_zkAddr, CLUSTER_NAME, controllerName); _controller.syncStart(); HelixClusterVerifier clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(_zkAddr).build(); Assert.assertTrue(clusterVerifier.verify()); - - _zkServerRef.set(_zkServer); } @Test @@ -66,7 +69,7 @@ public class TestZkConnectionLost extends TaskTestBase { System.setProperty("zk.session.timeout", "1000"); String queueName = TestHelper.getTestMethodName(); - startParticipants(); + startParticipants(_zkAddr); // Create a queue LOG.info("Starting job-queue: " + queueName); @@ -78,7 +81,6 @@ public class TestZkConnectionLost extends TaskTestBase { restartZkServer(); WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); - // ensure job 1 is started before stop it String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); _driver.pollForWorkflowState(scheduledQueue, 10000, TaskState.COMPLETED); } @@ -91,7 +93,7 @@ public class TestZkConnectionLost extends TaskTestBase { String queueName = TestHelper.getTestMethodName(); stopParticipants(); - startParticipants(); + startParticipants(_zkAddr); LOG.info("Starting job-queue: " + queueName); JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 6000); @@ -124,7 +126,7 @@ public class TestZkConnectionLost extends TaskTestBase { TestHelper.stopZkServer(_zkServerRef.get()); Thread.sleep(300); System.out.println("Restart ZK server"); - _zkServerRef.set(TestHelper.startZkServer(ZK_ADDR, null, false)); + _zkServerRef.set(TestHelper.startZkServer(_zkAddr, null, false)); } catch (Exception e) { LOG.error(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/helix/blob/aaa632f3/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java index c377233..cab60ce 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java @@ -121,16 +121,30 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase { } protected void startParticipants() { - startParticipants(_numNodes); + startParticipants(ZK_ADDR, _numNodes); + } + + protected void startParticipants(String zkAddr) { + startParticipants(zkAddr, _numNodes); } protected void startParticipants(int numNodes) { for (int i = 0; i < numNodes; i++) { - startParticipant(i); + startParticipant(ZK_ADDR, i); + } + } + + protected void startParticipants(String zkAddr, int numNodes) { + for (int i = 0; i < numNodes; i++) { + startParticipant(zkAddr, i); } } protected void startParticipant(int i) { + startParticipant(ZK_ADDR, i); + } + + protected void startParticipant(String zkAddr, int i) { Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() { @Override public Task createNewTask(TaskCallbackContext context) { @@ -138,7 +152,7 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase { } }); String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); - _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i] = new MockParticipantManager(zkAddr, CLUSTER_NAME, instanceName); // Register a Task state model factory. StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); @@ -165,12 +179,17 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase { protected void createManagers() throws Exception { + createManagers(ZK_ADDR, CLUSTER_NAME); + } + + protected void createManagers(String zkAddr, String clusterName) throws Exception { _manager = HelixManagerFactory - .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR); + .getZKHelixManager(clusterName, "Admin", InstanceType.ADMINISTRATOR, zkAddr); _manager.connect(); _driver = new TaskDriver(_manager); } + public void setSingleTestEnvironment() { _numDbs = 1; _numNodes = 1;
