Repository: helix Updated Branches: refs/heads/helix-0.6.x e2e3fec2d -> d5687fa41
[HELIX-562] TaskRebalancer doesn't honor MaxAttemptsPerTask when FailureThreshold is larger than 0, rb=29941 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d5687fa4 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d5687fa4 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d5687fa4 Branch: refs/heads/helix-0.6.x Commit: d5687fa41d091420f78023d950a9dc33f5e769ab Parents: e2e3fec Author: zzhang <[email protected]> Authored: Thu Jan 15 14:51:01 2015 -0800 Committer: zzhang <[email protected]> Committed: Thu Jan 15 14:51:01 2015 -0800 ---------------------------------------------------------------------- .../controller/rebalancer/AutoRebalancer.java | 16 +- .../org/apache/helix/task/TaskRebalancer.java | 22 ++- .../task/TestTaskRebalancerRetryLimit.java | 170 +++++++++++++++++++ 3 files changed, 196 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d5687fa4/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index d9b70d4..a8d83a2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -124,13 +124,6 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); - if (LOG.isInfoEnabled()) { - LOG.info("currentMapping: " + currentMapping); - LOG.info("stateCountMap: " + stateCountMap); - LOG.info("liveNodes: " + liveNodes); - LOG.info("allNodes: " + allNodes); - LOG.info("maxPartition: " + maxPartition); - } ReplicaPlacementScheme placementScheme = new DefaultPlacementScheme(); placementScheme.init(_manager); _algorithm = @@ -139,8 +132,13 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { ZNRecord newMapping = _algorithm.computePartitionAssignment(liveNodes, currentMapping, allNodes); - if (LOG.isInfoEnabled()) { - LOG.info("newMapping: " + newMapping); + if (LOG.isDebugEnabled()) { + LOG.debug("currentMapping: " + currentMapping); + LOG.debug("stateCountMap: " + stateCountMap); + LOG.debug("liveNodes: " + liveNodes); + LOG.debug("allNodes: " + allNodes); + LOG.debug("maxPartition: " + maxPartition); + LOG.debug("newMapping: " + newMapping); } IdealState newIdealState = new IdealState(resourceName); http://git-wip-us.apache.org/repos/asf/helix/blob/d5687fa4/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index a073b93..1c7a7a3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -405,7 +405,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // For delayed tasks, trigger a rebalance event for the closest upcoming ready time scheduleForNextTask(jobResource, jobCtx, currentTime); - if (isJobComplete(jobCtx, allPartitions, skippedPartitions)) { + if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) { workflowCtx.setJobState(jobResource, TaskState.COMPLETED); jobCtx.setFinishTime(currentTime); if (isWorkflowComplete(workflowCtx, workflowConfig)) { @@ -421,6 +421,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // This includes all completed, failed, delayed, and already assigned partitions. Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions); addCompletedPartitions(excludeSet, jobCtx, allPartitions); + addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg); excludeSet.addAll(skippedPartitions); excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime)); // Get instance->[partition, ...] mappings for the target resource. @@ -607,10 +608,11 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { * context, false otherwise. */ private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions, - Set<Integer> skippedPartitions) { + Set<Integer> skippedPartitions, JobConfig cfg) { for (Integer pId : allPartitions) { TaskPartitionState state = ctx.getPartitionState(pId); - if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED) { + if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED + && !isTaskGivenup(ctx, cfg, pId)) { return false; } } @@ -794,6 +796,20 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } } + private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) { + return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask(); + } + + // add all partitions that have been tried maxNumberAttempts + private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds, + JobConfig cfg) { + for (Integer pId : pIds) { + if (isTaskGivenup(ctx, cfg, pId)) { + set.add(pId); + } + } + } + private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, Set<Integer> excluded, int n) { List<Integer> result = new ArrayList<Integer>(); http://git-wip-us.apache.org/repos/asf/helix/blob/d5687fa4/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java new file mode 100644 index 0000000..b678d7e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerRetryLimit.java @@ -0,0 +1,170 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; + +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskPartitionState; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Test task will be retried up to MaxAttemptsPerTask {@see HELIX-562} + */ +public class TestTaskRebalancerRetryLimit extends ZkIntegrationTestBase { + private final String _clusterName = TestHelper.getTestClassName(); + private static final int _n = 5; + private static final int _p = 20; + private static final int _r = 3; + private final MockParticipantManager[] _participants = new MockParticipantManager[_n]; + private ClusterControllerManager _controller; + private HelixManager _manager; + private TaskDriver _driver; + + @BeforeClass + public void beforeClass() throws Exception { + ClusterSetup setup = new ClusterSetup(_gZkClient); + setup.addCluster(_clusterName, true); + for (int i = 0; i < _n; i++) { + String instanceName = "localhost_" + (12918 + i); + setup.addInstanceToCluster(_clusterName, instanceName); + } + + // Set up target db + setup.addResourceToCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _p, "MasterSlave"); + setup.rebalanceStorageCluster(_clusterName, WorkflowGenerator.DEFAULT_TGT_DB, _r); + + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + taskFactoryReg.put("ErrorTask", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new ErrorTask(); + } + }); + + // start dummy participants + for (int i = 0; i < _n; i++) { + String instanceName = "localhost_" + (12918 + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i], + taskFactoryReg)); + _participants[i].syncStart(); + } + + // start controller + String controllerName = "controller"; + _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName); + _controller.syncStart(); + + // create cluster manager + _manager = + HelixManagerFactory.getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, + ZK_ADDR); + _manager.connect(); + _driver = new TaskDriver(_manager); + + boolean result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + _clusterName)); + Assert.assertTrue(result); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (int i = 0; i < _n; i++) { + if (_participants[i] != null && _participants[i].isConnected()) { + _participants[i].syncStop(); + } + } + _manager.disconnect(); + } + + @Test + public void test() throws Exception { + String jobResource = TestHelper.getTestMethodName(); + Workflow flow = + WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(jobResource, + WorkflowGenerator.DEFAULT_COMMAND_CONFIG, JobConfig.MAX_ATTEMPTS_PER_TASK, + String.valueOf(2)).build(); + Map<String, Map<String, String>> jobConfigs = flow.getJobConfigs(); + for (Map<String, String> jobConfig : jobConfigs.values()) { + jobConfig.put(JobConfig.FAILURE_THRESHOLD, String.valueOf(Integer.MAX_VALUE)); + jobConfig.put(JobConfig.COMMAND, "ErrorTask"); + } + + _driver.start(flow); + + // Wait until the job completes. + TestUtil.pollForWorkflowState(_manager, jobResource, TaskState.COMPLETED); + + JobContext ctx = TaskUtil.getJobContext(_manager, TaskUtil.getNamespacedJobName(jobResource)); + for (int i = 0; i < _p; i++) { + TaskPartitionState state = ctx.getPartitionState(i); + if (state != null) { + Assert.assertEquals(state, TaskPartitionState.TASK_ERROR); + Assert.assertEquals(ctx.getPartitionNumAttempts(i), 2); + } + } + + } + + private static class ErrorTask implements Task { + public ErrorTask() { + } + + @Override + public TaskResult run() { + throw new RuntimeException("IGNORABLE exception: test throw exception from task"); + } + + @Override + public void cancel() { + } + } +}
