Repository: helix Updated Branches: refs/heads/master 00dc16db3 -> cfacbbac3
[HELIX-519] Add integration tests to ensure that kill-switch for Helix tasks work as expected, rb=26212 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cfacbbac Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cfacbbac Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cfacbbac Branch: refs/heads/master Commit: cfacbbac33d302d80968e7f11fe3b69468a17606 Parents: 00dc16d Author: zzhang <[email protected]> Authored: Wed Oct 1 11:08:39 2014 -0700 Committer: zzhang <[email protected]> Committed: Wed Oct 1 12:54:58 2014 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 20 +---- .../java/org/apache/helix/task/TaskDriver.java | 28 +++--- .../task/TestTaskRebalancerStopResume.java | 90 ++++++++++++++++++++ .../apache/helix/integration/task/TestUtil.java | 21 ++++- 4 files changed, 126 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/cfacbbac/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index d36b6f5..6fa3d05 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.ControllerChangeListener; import org.apache.helix.CurrentStateChangeListener; -import org.apache.helix.ExternalViewChangeListener; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixProperty; @@ -66,7 +65,6 @@ import org.apache.helix.controller.stages.ResourceValidationStage; import org.apache.helix.controller.stages.TaskAssignmentStage; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.CurrentState; -import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; @@ -94,7 +92,7 @@ import com.google.common.collect.Lists; */ public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, - ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener, + ControllerChangeListener, InstanceConfigChangeListener, ScopedConfigChangeListener { private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName()); volatile boolean init = false; @@ -180,7 +178,7 @@ public class GenericHelixController implements IdealStateChangeListener, } /** - * Starts the rebalancing timer + * Stops the rebalancing timer */ void stopRebalancingTimer() { if (_rebalanceTimer != null) { @@ -278,7 +276,7 @@ public class GenericHelixController implements IdealStateChangeListener, if (context != null) { if (context.getType() == Type.FINALIZE) { stopRebalancingTimer(); - logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName()); + logger.info("Get FINALIZE notification, skip the pipeline. Event: " + event.getName()); return; } else { if (_clusterStatusMonitor == null) { @@ -319,18 +317,6 @@ public class GenericHelixController implements IdealStateChangeListener, // callback @Override - public void onExternalViewChange(List<ExternalView> externalViewList, - NotificationContext changeContext) { - // logger.info("START: GenericClusterController.onExternalViewChange()"); - // ClusterEvent event = new ClusterEvent("externalViewChange"); - // event.addAttribute("helixmanager", changeContext.getManager()); - // event.addAttribute("changeContext", changeContext); - // event.addAttribute("eventData", externalViewList); - // _eventQueue.put(event); - // logger.info("END: GenericClusterController.onExternalViewChange()"); - } - - @Override public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) { logger.info("START: GenericClusterController.onStateChange()"); http://git-wip-us.apache.org/repos/asf/helix/blob/cfacbbac/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index a341a3b..bcbe76a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -125,13 +125,13 @@ public class TaskDriver { } break; case stop: - driver.setTaskTargetState(resource, TargetState.STOP); + driver.setWorkflowTargetState(resource, TargetState.STOP); break; case resume: - driver.setTaskTargetState(resource, TargetState.START); + driver.setWorkflowTargetState(resource, TargetState.START); break; case delete: - driver.setTaskTargetState(resource, TargetState.DELETE); + driver.setWorkflowTargetState(resource, TargetState.DELETE); break; case list: driver.list(resource); @@ -357,36 +357,36 @@ public class TaskDriver { /** Public method to resume a workflow/queue */ public void resume(String workflow) { - setTaskTargetState(workflow, TargetState.START); + setWorkflowTargetState(workflow, TargetState.START); } /** Public method to stop a workflow/queue */ public void stop(String workflow) { - setTaskTargetState(workflow, TargetState.STOP); + setWorkflowTargetState(workflow, TargetState.STOP); } /** Public method to delete a workflow/queue */ public void delete(String workflow) { - setTaskTargetState(workflow, TargetState.DELETE); + setWorkflowTargetState(workflow, TargetState.DELETE); } - /** Helper function to change target state for a given task */ - private void setTaskTargetState(String jobResource, TargetState state) { - setSingleTaskTargetState(jobResource, state); + /** Helper function to change target state for a given workflow */ + private void setWorkflowTargetState(String workflowName, TargetState state) { + setSingleWorkflowTargetState(workflowName, state); // For recurring schedules, child workflows must also be handled HelixDataAccessor accessor = _manager.getHelixDataAccessor(); List<String> resources = accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); + String prefix = workflowName + "_" + TaskConstants.SCHEDULED; for (String resource : resources) { - String prefix = resource + "_" + TaskConstants.SCHEDULED; if (resource.startsWith(prefix)) { - setSingleTaskTargetState(resource, state); + setSingleWorkflowTargetState(resource, state); } } } - /** Helper function to change target state for a given task */ - private void setSingleTaskTargetState(String jobResource, final TargetState state) { + /** Helper function to change target state for a given workflow */ + private void setSingleWorkflowTargetState(String workflowName, final TargetState state) { HelixDataAccessor accessor = _manager.getHelixDataAccessor(); DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { @Override @@ -402,7 +402,7 @@ public class TaskDriver { List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList(); updaters.add(updater); List<String> paths = Lists.newArrayList(); - paths.add(accessor.keyBuilder().resourceConfig(jobResource).getPath()); + paths.add(accessor.keyBuilder().resourceConfig(workflowName).getPath()); accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT); invokeRebalance(); } http://git-wip-us.apache.org/repos/asf/helix/blob/cfacbbac/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java index 18d2df1..b9e9811 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -20,17 +20,26 @@ package org.apache.helix.integration.task; */ import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.manager.zk.MockParticipant; import org.apache.helix.manager.zk.MockController; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.JobDag; +import org.apache.helix.task.JobQueue; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskDriver; @@ -38,8 +47,10 @@ import org.apache.helix.task.TaskFactory; import org.apache.helix.task.TaskResult; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; import org.apache.helix.task.Workflow; import org.apache.helix.testutil.ZkTestBase; +import org.apache.helix.task.WorkflowConfig; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.log4j.Logger; import org.testng.Assert; @@ -48,6 +59,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; public class TestTaskRebalancerStopResume extends ZkTestBase { private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class); @@ -68,6 +80,8 @@ public class TestTaskRebalancerStopResume extends ZkTestBase { @BeforeClass public void beforeClass() throws Exception { + System.out.println("START " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis())); + String namespace = "/" + CLUSTER_NAME; if (_zkclient.exists(namespace)) { _zkclient.deleteRecursive(namespace); @@ -127,15 +141,18 @@ public class TestTaskRebalancerStopResume extends ZkTestBase { .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, CLUSTER_NAME)); Assert.assertTrue(result); + System.out.println("END BEFORECLASS " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis())); } @AfterClass public void afterClass() throws Exception { + System.out.println("START AFTERCLASS " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis())); _controller.syncStop(); for (int i = 0; i < n; i++) { _participants[i].syncStop(); } _manager.disconnect(); + System.out.println("END " + CLUSTER_NAME + " at " + new Date(System.currentTimeMillis())); } @Test @@ -176,6 +193,79 @@ public class TestTaskRebalancerStopResume extends ZkTestBase { TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED); } + @Test + public void stopAndResumeNamedQueue() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue queue = new JobQueue.Builder(queueName).build(); + _driver.createQueue(queue); + + // Enqueue jobs + Set<String> master = Sets.newHashSet("MASTER"); + JobConfig.Builder job1 = + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master); + String job1Name = "masterJob"; + LOG.info("Enqueuing job: " + job1Name); + _driver.enqueueJob(queueName, job1Name, job1); + + Set<String> slave = Sets.newHashSet("SLAVE"); + JobConfig.Builder job2 = + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave); + String job2Name = "slaveJob"; + LOG.info("Enqueuing job: " + job2Name); + _driver.enqueueJob(queueName, job2Name, job2); + + String namespacedJob1 = String.format("%s_%s", queueName, job1Name); + TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.IN_PROGRESS); + + // stop job1 + LOG.info("Pausing job-queue: " + queueName); + _driver.stop(queueName); + TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.STOPPED); + TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED); + + // Ensure job2 is not started + TimeUnit.MILLISECONDS.sleep(200); + String namespacedJob2 = String.format("%s_%s", queueName, job2Name); + TestUtil.pollForEmptyJobState(_manager, queueName, job2Name); + + LOG.info("Resuming job-queue: " + queueName); + _driver.resume(queueName); + + // Ensure successful completion + TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED); + TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED); + JobContext masterJobContext = TaskUtil.getJobContext(_manager, namespacedJob1); + JobContext slaveJobContext = TaskUtil.getJobContext(_manager, namespacedJob2); + + // Ensure correct ordering + long job1Finish = masterJobContext.getFinishTime(); + long job2Start = slaveJobContext.getStartTime(); + Assert.assertTrue(job2Start >= job1Finish); + + // Flush queue and check cleanup + LOG.info("Flusing job-queue: " + queueName); + _driver.flushQueue(queueName); + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1))); + Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1))); + Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2))); + Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2))); + WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName); + JobDag dag = workflowCfg.getJobDag(); + Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1)); + Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2)); + Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1)); + Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2)); + Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1)); + Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2)); + } + public static class ReindexTask implements Task { private final long _delay; private volatile boolean _canceled; http://git-wip-us.apache.org/repos/asf/helix/blob/cfacbbac/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java index f599920..413b98a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java @@ -20,6 +20,7 @@ package org.apache.helix.integration.task; */ import org.apache.helix.HelixManager; +import org.apache.helix.TestHelper; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskUtil; import org.apache.helix.task.WorkflowContext; @@ -29,6 +30,8 @@ import org.testng.Assert; * Static test utility methods. */ public class TestUtil { + private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */ + /** * Polls {@link org.apache.helix.task.JobContext} for given task resource until a timeout is * reached. @@ -45,7 +48,7 @@ public class TestUtil { Thread.sleep(100); ctx = TaskUtil.getWorkflowContext(manager, workflowResource); } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state) - && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */); + && System.currentTimeMillis() < st + _default_timeout); Assert.assertNotNull(ctx); Assert.assertEquals(ctx.getWorkflowState(), state); @@ -60,8 +63,22 @@ public class TestUtil { Thread.sleep(100); ctx = TaskUtil.getWorkflowContext(manager, workflowResource); } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName) != state) - && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */); + && System.currentTimeMillis() < st + _default_timeout); Assert.assertNotNull(ctx); + Assert.assertEquals(ctx.getJobState(jobName), state); } + public static void pollForEmptyJobState(final HelixManager manager, final String workflowName, + final String jobName) throws Exception { + final String namespacedJobName = String.format("%s_%s", workflowName, jobName); + boolean succeed = TestHelper.verify(new TestHelper.Verifier() { + + @Override + public boolean verify() throws Exception { + WorkflowContext ctx = TaskUtil.getWorkflowContext(manager, workflowName); + return ctx == null || ctx.getJobState(namespacedJobName) == null; + } + }, _default_timeout); + Assert.assertTrue(succeed); + } }
