Repository: helix Updated Branches: refs/heads/helix-0.6.x 2ebfe7d19 -> 8c5e63ab2
[HELIX-518] Add integration tests to ensure helix tasks work as expected during master failover, rb=26272 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8c5e63ab Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8c5e63ab Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8c5e63ab Branch: refs/heads/helix-0.6.x Commit: 8c5e63ab263d2cbdf1f17bb98335afb69974be99 Parents: 2ebfe7d Author: zzhang <[email protected]> Authored: Fri Oct 3 12:03:43 2014 -0700 Committer: zzhang <[email protected]> Committed: Fri Oct 3 12:03:43 2014 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 16 +- .../helix/integration/task/DummyTask.java | 72 +++++++ .../task/TestTaskRebalancerFailover.java | 216 +++++++++++++++++++ 3 files changed, 289 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 546e3bb..81c2e3d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -32,7 +32,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.ConfigChangeListener; import org.apache.helix.ControllerChangeListener; import org.apache.helix.CurrentStateChangeListener; -import org.apache.helix.ExternalViewChangeListener; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.IdealStateChangeListener; @@ -60,7 +59,6 @@ import org.apache.helix.controller.stages.ResourceComputationStage; import org.apache.helix.controller.stages.ResourceValidationStage; import org.apache.helix.controller.stages.TaskAssignmentStage; import org.apache.helix.model.CurrentState; -import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; @@ -84,7 +82,7 @@ import org.apache.log4j.Logger; */ public class GenericHelixController implements ConfigChangeListener, IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, - ExternalViewChangeListener, ControllerChangeListener, InstanceConfigChangeListener { + ControllerChangeListener, InstanceConfigChangeListener { private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName()); volatile boolean init = false; private final PipelineRegistry _registry; @@ -307,18 +305,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC // callback @Override - public void onExternalViewChange(List<ExternalView> externalViewList, - NotificationContext changeContext) { - // logger.info("START: GenericClusterController.onExternalViewChange()"); - // ClusterEvent event = new ClusterEvent("externalViewChange"); - // event.addAttribute("helixmanager", changeContext.getManager()); - // event.addAttribute("changeContext", changeContext); - // event.addAttribute("eventData", externalViewList); - // _eventQueue.put(event); - // logger.info("END: GenericClusterController.onExternalViewChange()"); - } - - @Override public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) { logger.info("START: GenericClusterController.onStateChange()"); http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java new file mode 100644 index 0000000..b6054d0 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/DummyTask.java @@ -0,0 +1,72 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.Map; + +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskResult; + +public class DummyTask implements Task { + private static final String TIMEOUT_CONFIG = "Timeout"; + private final long _delay; + private volatile boolean _canceled; + + public DummyTask(TaskCallbackContext context) { + JobConfig jobCfg = context.getJobConfig(); + Map<String, String> cfg = jobCfg.getJobCommandConfigMap(); + if (cfg == null) { + cfg = Collections.emptyMap(); + } + _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L; + } + + @Override + public TaskResult run() { + long expiry = System.currentTimeMillis() + _delay; + long timeLeft; + while (System.currentTimeMillis() < expiry) { + if (_canceled) { + timeLeft = expiry - System.currentTimeMillis(); + return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 + : timeLeft)); + } + sleep(50); + } + timeLeft = expiry - System.currentTimeMillis(); + return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft)); + } + + @Override + public void cancel() { + _canceled = true; + } + + private static void sleep(long d) { + try { + Thread.sleep(d); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/8c5e63ab/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java new file mode 100644 index 0000000..b8e1c09 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerFailover.java @@ -0,0 +1,216 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.JobDag; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; + +public class TestTaskRebalancerFailover extends ZkUnitTestBase { + private static final Logger LOG = Logger.getLogger(TestTaskRebalancerFailover.class); + + private final String _clusterName = TestHelper.getTestClassName(); + private static final int _n = 5; + private static final int _p = 20; + private static final int _r = 3; + private final MockParticipantManager[] _participants = new MockParticipantManager[_n]; + private ClusterControllerManager _controller; + private HelixManager _manager; + private TaskDriver _driver; + + @BeforeClass + public void beforeClass() throws Exception { + ClusterSetup setup = new ClusterSetup(_gZkClient); + setup.addCluster(_clusterName, true); + for (int i = 0; i < _n; i++) { + String instanceName = "localhost_" + (12918 + i); + setup.addInstanceToCluster(_clusterName, instanceName); + } + + // Set up target db + setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave"); + setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r); + + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + taskFactoryReg.put("DummyTask", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new DummyTask(context); + } + }); + + // start dummy participants + for (int i = 0; i < _n; i++) { + String instanceName = "localhost_" + (12918 + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i], + taskFactoryReg)); + _participants[i].syncStart(); + } + + // start controller + String controllerName = "controller"; + _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName); + _controller.syncStart(); + + // create cluster manager + _manager = + HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, + ZK_ADDR); + _manager.connect(); + _driver = new TaskDriver(_manager); + + boolean result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + _clusterName)); + Assert.assertTrue(result); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (int i = 0; i < _n; i++) { + if (_participants[i] != null && _participants[i].isConnected()) { + _participants[i].syncStop(); + } + } + _manager.disconnect(); + } + + @Test + public void test() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue queue = new JobQueue.Builder(queueName).build(); + _driver.createQueue(queue); + + // Enqueue jobs + Set<String> master = Sets.newHashSet("MASTER"); + JobConfig.Builder job = + new JobConfig.Builder().setCommand("DummyTask") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master); + String job1Name = "masterJob"; + LOG.info("Enqueuing job: " + job1Name); + _driver.enqueueJob(queueName, job1Name, job); + + // check all tasks completed on MASTER + String namespacedJob1 = String.format("%s_%s", queueName, job1Name); + TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED); + + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + ExternalView ev = + accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB)); + JobContext ctx = TaskUtil.getJobContext(_manager, namespacedJob1); + Set<String> failOverPartitions = Sets.newHashSet(); + for (int p = 0; p < _p; p++) { + String instanceName = ctx.getAssignedParticipant(p); + Assert.assertNotNull(instanceName); + String partitionName = ctx.getTargetForPartition(p); + Assert.assertNotNull(partitionName); + String state = ev.getStateMap(partitionName).get(instanceName); + Assert.assertNotNull(state); + Assert.assertEquals(state, "MASTER"); + if (instanceName.equals("localhost_12918")) { + failOverPartitions.add(partitionName); + } + } + + // enqueue another master job and fail localhost_12918 + String job2Name = "masterJob2"; + String namespacedJob2 = String.format("%s_%s", queueName, job2Name); + LOG.info("Enqueuing job: " + job2Name); + _driver.enqueueJob(queueName, job2Name, job); + + TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.IN_PROGRESS); + _participants[0].syncStop(); + TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED); + + // tasks previously assigned to localhost_12918 should be re-scheduled on new master + ctx = TaskUtil.getJobContext(_manager, namespacedJob2); + ev = accessor.getProperty(keyBuilder.externalView(WorkflowGenerator.DEFAULT_TGT_DB)); + for (int p = 0; p < _p; p++) { + String partitionName = ctx.getTargetForPartition(p); + Assert.assertNotNull(partitionName); + if (failOverPartitions.contains(partitionName)) { + String instanceName = ctx.getAssignedParticipant(p); + Assert.assertNotNull(instanceName); + Assert.assertNotSame(instanceName, "localhost_12918"); + String state = ev.getStateMap(partitionName).get(instanceName); + Assert.assertNotNull(state); + Assert.assertEquals(state, "MASTER"); + } + } + + // Flush queue and check cleanup + _driver.flushQueue(queueName); + Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob1))); + Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob1))); + Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(namespacedJob2))); + Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(namespacedJob2))); + WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, queueName); + JobDag dag = workflowCfg.getJobDag(); + Assert.assertFalse(dag.getAllNodes().contains(namespacedJob1)); + Assert.assertFalse(dag.getAllNodes().contains(namespacedJob2)); + Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob1)); + Assert.assertFalse(dag.getChildrenToParents().containsKey(namespacedJob2)); + Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob1)); + Assert.assertFalse(dag.getParentsToChildren().containsKey(namespacedJob2)); + } +}
