Repository: helix Updated Branches: refs/heads/master 79ebc0469 -> 55b844657
http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java new file mode 100644 index 0000000..7a87a0f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java @@ -0,0 +1,283 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.NotificationContext; +import org.apache.helix.api.config.StateTransitionThrottleConfig; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.mock.participant.MockTransition; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.Message; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase { + ConfigAccessor _configAccessor; + + @Override + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(_gZkClient); + + // setup storage cluster + _setupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NODE_NR; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // start dummy participants + for (int i = 0; i < NODE_NR - 2; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + participant.setTransition(new DelayedTransition()); + participant.syncStart(); + _participants[i] = participant; + } + + _configAccessor = new ConfigAccessor(_gZkClient); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + } + + @Test() + public void testResourceThrottle() throws Exception { + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + + StateTransitionThrottleConfig resourceThrottle = + new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 2); + + StateTransitionThrottleConfig clusterThrottle = + new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 100); + + clusterConfig.setStateTransitionThrottleConfigs( + Arrays.asList(resourceThrottle, clusterThrottle)); + clusterConfig.setPersistIntermediateAssignment(true); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + List<String> dbs = new ArrayList<String>(); + + for (int i = 0; i < 5; i++) { + String dbName = "TestDB-" + i; + _setupTool.addResourceToCluster(CLUSTER_NAME, dbName, 10, STATE_MODEL, + RebalanceMode.FULL_AUTO + ""); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica); + dbs.add(dbName); + } + + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build(); + _clusterVerifier.verify(10000); + + DelayedTransition.setDelay(50); + DelayedTransition.enableThrottleRecord(); + + // add 2 nodes + for (int i = NODE_NR - 2; i < NODE_NR; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_')); + participant.syncStart(); + _participants[i] = participant; + } + + _clusterVerifier.verify(20000); + + for (String db : dbs) { + validateThrottle(DelayedTransition.getResourcePatitionTransitionTimes(), db, 2); + } + } + + private void validateThrottle( + Map<String, List<PartitionTransitionTime>> partitionTransitionTimesMap, + String throttledItemName, int maxPendingTransition) { + List<PartitionTransitionTime> pTimeList = partitionTransitionTimesMap.get(throttledItemName); + + Map<Long, List<PartitionTransitionTime>> startMap = + new HashMap<Long, List<PartitionTransitionTime>>(); + Map<Long, List<PartitionTransitionTime>> endMap = + new HashMap<Long, List<PartitionTransitionTime>>(); + List<Long> startEndPoints = new ArrayList<Long>(); + + if (pTimeList == null) { + System.out.println("no throttle result for :" + throttledItemName); + return; + } + Collections.sort(pTimeList, new Comparator<PartitionTransitionTime>() { + @Override + public int compare(PartitionTransitionTime o1, PartitionTransitionTime o2) { + return (int) (o1.start - o2.start); + } + }); + + for (PartitionTransitionTime interval : pTimeList) { + if (!startMap.containsKey(interval.start)) { + startMap.put(interval.start, new ArrayList<PartitionTransitionTime>()); + } + startMap.get(interval.start).add(interval); + + if (!endMap.containsKey(interval.end)) { + endMap.put(interval.end, new ArrayList<PartitionTransitionTime>()); + } + endMap.get(interval.end).add(interval); + startEndPoints.add(interval.start); + startEndPoints.add(interval.end); + } + + Collections.sort(startEndPoints); + + List<PartitionTransitionTime> temp = new ArrayList<PartitionTransitionTime>(); + + int maxInParallel = 0; + for (long point : startEndPoints) { + if (startMap.containsKey(point)) { + temp.addAll(startMap.get(point)); + } + int curSize = size(temp); + if (curSize > maxInParallel) { + maxInParallel = curSize; + } + if (endMap.containsKey(point)) { + temp.removeAll(endMap.get(point)); + } + } + + System.out.println( + "MaxInParallel: " + maxInParallel + " maxPendingTransition: " + maxPendingTransition); + Assert.assertTrue(maxInParallel <= maxPendingTransition, + "Throttle condition does not meet for " + throttledItemName); + } + + + private int size(List<PartitionTransitionTime> timeList) { + Set<String> partitions = new HashSet<String>(); + for (PartitionTransitionTime p : timeList) { + partitions.add(p.partition); + } + return partitions.size(); + } + + private static class PartitionTransitionTime { + String partition; + long start; + long end; + + public PartitionTransitionTime(String partition, long start, long end) { + this.partition = partition; + this.start = start; + this.end = end; + } + + @Override public String toString() { + return "[" + + "partition='" + partition + '\'' + + ", start=" + start + + ", end=" + end + + ']'; + } + } + + private static class DelayedTransition extends MockTransition { + private static long _delay = 0; + private static Map<String, List<PartitionTransitionTime>> resourcePatitionTransitionTimes = + new HashMap<String, List<PartitionTransitionTime>>(); + private static Map<String, List<PartitionTransitionTime>> instancePatitionTransitionTimes = + new HashMap<String, List<PartitionTransitionTime>>(); + private static boolean _recordThrottle = false; + + public static void setDelay(long delay) { + _delay = delay; + } + + public static Map<String, List<PartitionTransitionTime>> getResourcePatitionTransitionTimes() { + return resourcePatitionTransitionTimes; + } + + public static Map<String, List<PartitionTransitionTime>> getInstancePatitionTransitionTimes() { + return instancePatitionTransitionTimes; + } + + public static void enableThrottleRecord() { + _recordThrottle = true; + } + + @Override public void doTransition(Message message, NotificationContext context) + throws InterruptedException { + long start = System.currentTimeMillis(); + if (_delay > 0) { + Thread.sleep(_delay); + } + long end = System.currentTimeMillis(); + if (_recordThrottle) { + PartitionTransitionTime partitionTransitionTime = + new PartitionTransitionTime(message.getPartitionName(), start, end); + + /*System.out.println(String + .format("Transit resource %s partition %s from %s to %s at instance %s: %s", + message.getResourceName(), message.getPartitionName(), message.getFromState(), + message.getToState(), message.getTgtName(), partitionTransitionTime)); + */ + if (!resourcePatitionTransitionTimes.containsKey(message.getResourceName())) { + resourcePatitionTransitionTimes + .put(message.getResourceName(), new ArrayList<PartitionTransitionTime>()); + } + resourcePatitionTransitionTimes.get(message.getResourceName()).add(partitionTransitionTime); + + if (!instancePatitionTransitionTimes.containsKey(message.getTgtName())) { + instancePatitionTransitionTimes + .put(message.getTgtName(), new ArrayList<PartitionTransitionTime>()); + } + instancePatitionTransitionTimes.get(message.getTgtName()).add(partitionTransitionTime); + } + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/4e487196/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java index d89533a..641f13a 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java @@ -286,7 +286,7 @@ public class TaskTestUtil { runStage(event, stage); } - return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString()); + return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); } /**
