http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/AbstractLoadBalancingPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/AbstractLoadBalancingPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/AbstractLoadBalancingPolicyTest.java new file mode 100644 index 0000000..6257bc9 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/AbstractLoadBalancingPolicyTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import static org.testng.Assert.assertEquals; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.test.entity.TestApplication; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.DynamicGroup; +import brooklyn.entity.basic.Entities; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.Sensors; +import org.apache.brooklyn.location.basic.SimulatedLocation; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Time; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class AbstractLoadBalancingPolicyTest { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractLoadBalancingPolicyTest.class); + + protected static final long TIMEOUT_MS = 10*1000; + protected static final long SHORT_WAIT_MS = 250; + + protected static final long CONTAINER_STARTUP_DELAY_MS = 100; + + public static final AttributeSensor<Integer> TEST_METRIC = + Sensors.newIntegerSensor("test.metric", "Dummy workrate for test entities"); + + public static final ConfigKey<Double> LOW_THRESHOLD_CONFIG_KEY = new BasicConfigKey<Double>(Double.class, TEST_METRIC.getName()+".threshold.low", "desc", 0.0); + public static final ConfigKey<Double> HIGH_THRESHOLD_CONFIG_KEY = new BasicConfigKey<Double>(Double.class, TEST_METRIC.getName()+".threshold.high", "desc", 0.0); + + protected TestApplication app; + protected SimulatedLocation loc; + protected BalanceableWorkerPool pool; + protected DefaultBalanceablePoolModel<Entity, Entity> model; + protected LoadBalancingPolicy policy; + protected Group containerGroup; + protected Group itemGroup; + protected Random random = new Random(); + + @BeforeMethod(alwaysRun=true) + public void before() { + LOG.debug("In AbstractLoadBalancingPolicyTest.before()"); + + MockItemEntityImpl.totalMoveCount.set(0); + MockItemEntityImpl.lastMoveTime.set(0); + + loc = new SimulatedLocation(MutableMap.of("name", "loc")); + + model = new DefaultBalanceablePoolModel<Entity, Entity>("pool-model"); + + app = TestApplication.Factory.newManagedInstanceForTests(); + containerGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class) + .displayName("containerGroup") + .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockContainerEntity.class))); + itemGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class) + .displayName("itemGroup") + .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockItemEntity.class))); + pool = app.createAndManageChild(EntitySpec.create(BalanceableWorkerPool.class)); + pool.setContents(containerGroup, itemGroup); + policy = new LoadBalancingPolicy(MutableMap.of("minPeriodBetweenExecs", 1), TEST_METRIC, model); + pool.addPolicy(policy); + app.start(ImmutableList.of(loc)); + } + + @AfterMethod(alwaysRun=true) + public void after() { + if (policy != null) policy.destroy(); + if (app != null) Entities.destroyAll(app.getManagementContext()); + } + + // Using this utility, as it gives more info about the workrates of all containers rather than just the one that differs + protected void assertWorkrates(Collection<MockContainerEntity> containers, Collection<Double> expectedC, double precision) { + Iterable<Double> actual = Iterables.transform(containers, new Function<MockContainerEntity, Double>() { + public Double apply(MockContainerEntity input) { + return getContainerWorkrate(input); + }}); + + List<Double> expected = Lists.newArrayList(expectedC); + String errMsg = "actual="+actual+"; expected="+expected; + assertEquals(containers.size(), expected.size(), errMsg); + for (int i = 0; i < containers.size(); i++) { + assertEquals(Iterables.get(actual, i), expected.get(i), precision, errMsg); + } + } + + protected void assertWorkratesEventually(Collection<MockContainerEntity> containers, Iterable<? extends Movable> items, Collection<Double> expected) { + assertWorkratesEventually(containers, items, expected, 0d); + } + + /** + * Asserts that the given container have the given expected workrates (by querying the containers directly). + * Accepts an accuracy of "precision" for each container's workrate. + */ + protected void assertWorkratesEventually(final Collection<MockContainerEntity> containers, final Iterable<? extends Movable> items, final Collection<Double> expected, final double precision) { + try { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + public void run() { + assertWorkrates(containers, expected, precision); + }}); + } catch (AssertionError e) { + String errMsg = e.getMessage()+"; "+verboseDumpToString(containers, items); + throw new RuntimeException(errMsg, e); + } + } + + // Using this utility, as it gives more info about the workrates of all containers rather than just the one that differs + protected void assertWorkratesContinually(List<MockContainerEntity> containers, Iterable<? extends Movable> items, List<Double> expected) { + assertWorkratesContinually(containers, items, expected, 0d); + } + + /** + * Asserts that the given containers have the given expected workrates (by querying the containers directly) + * continuously for SHORT_WAIT_MS. + * Accepts an accuracy of "precision" for each container's workrate. + */ + protected void assertWorkratesContinually(final List<MockContainerEntity> containers, Iterable<? extends Movable> items, final List<Double> expected, final double precision) { + try { + Asserts.succeedsContinually(MutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() { + public void run() { + assertWorkrates(containers, expected, precision); + }}); + } catch (AssertionError e) { + String errMsg = e.getMessage()+"; "+verboseDumpToString(containers, items); + throw new RuntimeException(errMsg, e); + } + } + + protected String verboseDumpToString(Iterable<MockContainerEntity> containers, Iterable<? extends Movable> items) { + Iterable<Double> containerRates = Iterables.transform(containers, new Function<MockContainerEntity, Double>() { + @Override public Double apply(MockContainerEntity input) { + return (double) input.getWorkrate(); + }}); + + Map<MockContainerEntity, Set<Movable>> itemDistributionByContainer = Maps.newLinkedHashMap(); + for (MockContainerEntity container : containers) { + itemDistributionByContainer.put(container, container.getBalanceableItems()); + } + + Map<Movable, BalanceableContainer<?>> itemDistributionByItem = Maps.newLinkedHashMap(); + for (Movable item : items) { + itemDistributionByItem.put(item, item.getAttribute(Movable.CONTAINER)); + } + + String modelItemDistribution = model.itemDistributionToString(); + return "containers="+containers+"; containerRates="+containerRates + +"; itemDistributionByContainer="+itemDistributionByContainer + +"; itemDistributionByItem="+itemDistributionByItem + +"; model="+modelItemDistribution + +"; totalMoves="+MockItemEntityImpl.totalMoveCount + +"; lastMoveTime="+Time.makeDateString(MockItemEntityImpl.lastMoveTime.get()); + } + + protected MockContainerEntity newContainer(TestApplication app, String name, double lowThreshold, double highThreshold) { + return newAsyncContainer(app, name, lowThreshold, highThreshold, 0); + } + + /** + * Creates a new container that will take "delay" millis to complete its start-up. + */ + protected MockContainerEntity newAsyncContainer(TestApplication app, String name, double lowThreshold, double highThreshold, long delay) { + MockContainerEntity container = app.createAndManageChild(EntitySpec.create(MockContainerEntity.class) + .displayName(name) + .configure(MockContainerEntity.DELAY, delay) + .configure(LOW_THRESHOLD_CONFIG_KEY, lowThreshold) + .configure(HIGH_THRESHOLD_CONFIG_KEY, highThreshold)); + LOG.debug("Managed new container {}", container); + container.start(ImmutableList.of(loc)); + return container; + } + + protected static MockItemEntity newItem(TestApplication app, MockContainerEntity container, String name, double workrate) { + MockItemEntity item = app.createAndManageChild(EntitySpec.create(MockItemEntity.class) + .displayName(name)); + LOG.debug("Managing new item {} on container {}", item, container); + item.move(container); + ((EntityLocal)item).setAttribute(TEST_METRIC, (int)workrate); + return item; + } + + protected static MockItemEntity newLockedItem(TestApplication app, MockContainerEntity container, String name, double workrate) { + MockItemEntity item = app.createAndManageChild(EntitySpec.create(MockItemEntity.class) + .displayName(name) + .configure(Movable.IMMOVABLE, true)); + LOG.debug("Managed new item {} on container {}", item, container); + item.move(container); + ((EntityLocal)item).setAttribute(TEST_METRIC, (int)workrate); + return item; + } + + /** + * Asks the item directly for its workrate. + */ + protected static double getItemWorkrate(MockItemEntity item) { + Object result = item.getAttribute(TEST_METRIC); + return (result == null ? 0 : ((Number) result).doubleValue()); + } + + /** + * Asks the container for its items, and then each of those items directly for their workrates; returns the total. + */ + protected static double getContainerWorkrate(MockContainerEntity container) { + double result = 0.0; + Preconditions.checkNotNull(container, "container"); + for (Movable item : container.getBalanceableItems()) { + Preconditions.checkNotNull(item, "item in container"); + assertEquals(item.getContainerId(), container.getId()); + result += getItemWorkrate((MockItemEntity)item); + } + return result; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolTest.java new file mode 100644 index 0000000..18f6847 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/BalanceableWorkerPoolTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.test.entity.TestApplication; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.AbstractGroup; +import brooklyn.entity.basic.AbstractGroupImpl; +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.DynamicGroup; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.trait.Resizable; +import org.apache.brooklyn.location.basic.SimulatedLocation; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; + +public class BalanceableWorkerPoolTest { + + private static final Logger LOG = LoggerFactory.getLogger(BalanceableWorkerPoolTest.class); + + protected static final long TIMEOUT_MS = 10*1000; + protected static final long SHORT_WAIT_MS = 250; + + protected static final long CONTAINER_STARTUP_DELAY_MS = 100; + + protected TestApplication app; + protected SimulatedLocation loc; + protected BalanceableWorkerPool pool; + protected Group containerGroup; + protected Group itemGroup; + + @BeforeMethod(alwaysRun=true) + public void before() { + loc = new SimulatedLocation(MutableMap.of("name", "loc")); + + app = ApplicationBuilder.newManagedApp(TestApplication.class); + containerGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class) + .displayName("containerGroup") + .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockContainerEntity.class))); + itemGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class) + .displayName("itemGroup") + .configure(DynamicGroup.ENTITY_FILTER, Predicates.instanceOf(MockItemEntity.class))); + pool = app.createAndManageChild(EntitySpec.create(BalanceableWorkerPool.class)); + pool.setContents(containerGroup, itemGroup); + + app.start(ImmutableList.of(loc)); + } + + @AfterMethod(alwaysRun=true) + public void after() { + if (app != null) Entities.destroyAll(app.getManagementContext()); + } + + @Test + public void testDefaultResizeFailsIfContainerGroupNotResizable() throws Exception { + try { + pool.resize(1); + fail(); + } catch (Exception e) { + if (Exceptions.getFirstThrowableOfType(e, UnsupportedOperationException.class) == null) throw e; + } + } + + @Test + public void testDefaultResizeCallsResizeOnContainerGroup() { + LocallyResizableGroup resizable = app.createAndManageChild(EntitySpec.create(LocallyResizableGroup.class)); + + BalanceableWorkerPool pool2 = app.createAndManageChild(EntitySpec.create(BalanceableWorkerPool.class)); + pool2.setContents(resizable, itemGroup); + Entities.manage(pool2); + + pool2.resize(123); + assertEquals(resizable.getCurrentSize(), (Integer) 123); + } + + @Test + public void testCustomResizableCalledWhenResizing() { + LocallyResizableGroup resizable = app.createAndManageChild(EntitySpec.create(LocallyResizableGroup.class)); + + pool.setResizable(resizable); + + pool.resize(123); + assertEquals(resizable.getCurrentSize(), (Integer)123); + } + + @ImplementedBy(LocallyResizableGroupImpl.class) + public static interface LocallyResizableGroup extends AbstractGroup, Resizable { + } + + public static class LocallyResizableGroupImpl extends AbstractGroupImpl implements LocallyResizableGroup { + private int size = 0; + + @Override + public Integer resize(Integer newSize) { + size = newSize; + return size; + } + @Override + public Integer getCurrentSize() { + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupTest.java new file mode 100644 index 0000000..c256334 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import static org.testng.Assert.assertEquals; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.test.entity.TestApplication; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.DynamicGroup; +import brooklyn.entity.basic.Entities; +import org.apache.brooklyn.location.basic.SimulatedLocation; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class ItemsInContainersGroupTest { + + // all tests are 20ms or less, but use a big timeout just in case very slow machine! + private static final long TIMEOUT_MS = 15000; + + private TestApplication app; + private SimulatedLocation loc; + private Group containerGroup; + private ItemsInContainersGroup itemGroup; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + loc = new SimulatedLocation(MutableMap.of("name", "loc")); + + app = ApplicationBuilder.newManagedApp(TestApplication.class); + containerGroup = app.createAndManageChild(EntitySpec.create(DynamicGroup.class) + .displayName("containerGroup") + .configure(DynamicGroup.ENTITY_FILTER, new Predicate<Entity>() { + public boolean apply(Entity input) { + return input instanceof MockContainerEntity && + input.getConfig(MockContainerEntity.MOCK_MEMBERSHIP) == "ingroup"; + }})); + itemGroup = app.createAndManageChild(EntitySpec.create(ItemsInContainersGroup.class) + .displayName("itemGroup")); + itemGroup.setContainers(containerGroup); + + app.start(ImmutableList.of(loc)); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (app != null) Entities.destroyAll(app.getManagementContext()); + } + + @Test + public void testSimpleMembership() throws Exception { + MockContainerEntity containerIn = newContainer(app, "A", "ingroup"); + MockItemEntity item1 = newItem(app, containerIn, "1"); + MockItemEntity item2 = newItem(app, containerIn, "2"); + + assertItemsEventually(item1, item2); + } + + @Test + public void testFilterIsAppliedToItems() throws Exception { + itemGroup.stop(); + Entities.unmanage(itemGroup); + + itemGroup = app.createAndManageChild(EntitySpec.create(ItemsInContainersGroup.class) + .displayName("itemGroupWithDispName2") + .configure(ItemsInContainersGroup.ITEM_FILTER, new Predicate<Entity>() { + public boolean apply(Entity input) { + return "2".equals(input.getDisplayName()); + }})); + itemGroup.setContainers(containerGroup); + + MockContainerEntity containerIn = newContainer(app, "A", "ingroup"); + MockItemEntity item1 = newItem(app, containerIn, "1"); + MockItemEntity item2 = newItem(app, containerIn, "2"); + + assertItemsEventually(item2); // does not include item1 + } + + @Test + public void testItemsInOtherContainersIgnored() throws Exception { + MockContainerEntity containerOut = newContainer(app, "A", "outgroup"); + MockItemEntity item1 = newItem(app, containerOut, "1"); + + assertItemsEventually(); + } + + @Test + public void testItemMovedInIsAdded() throws Exception { + MockContainerEntity containerIn = newContainer(app, "A", "ingroup"); + MockContainerEntity containerOut = newContainer(app, "A", "outgroup"); + MockItemEntity item1 = newItem(app, containerOut, "1"); + item1.move(containerIn); + + assertItemsEventually(item1); + } + + @Test + public void testItemMovedOutIsRemoved() throws Exception { + MockContainerEntity containerIn = newContainer(app, "A", "ingroup"); + MockContainerEntity containerOut = newContainer(app, "A", "outgroup"); + MockItemEntity item1 = newItem(app, containerIn, "1"); + assertItemsEventually(item1); + + item1.move(containerOut); + assertItemsEventually(); + } + + /* + * Previously could fail if... + * ItemsInContainersGroupImpl listener got notified of Movable.CONTAINER after entity was unmanaged + * (because being done in concurrent threads). + * This called ItemsInContainersGroupImpl.onItemMoved, which called addMember to add it back in again. + * In AbstractGroup.addMember, we now check if the entity is still managed, to + * ensure there is synchronization for concurrent calls to add/remove member. + */ + @Test + public void testItemUnmanagedIsRemoved() throws Exception { + MockContainerEntity containerIn = newContainer(app, "A", "ingroup"); + MockItemEntity item1 = newItem(app, containerIn, "1"); + assertItemsEventually(item1); + + Entities.unmanage(item1); + assertItemsEventually(); + } + + // TODO How to test this? Will it be used? + // Adding a new container then adding items to it is tested in many other methods. + @Test(enabled=false) + public void testContainerAddedWillAddItsItems() throws Exception { + } + + @Test + public void testContainerRemovedWillRemoveItsItems() throws Exception { + MockContainerEntity containerA = newContainer(app, "A", "ingroup"); + MockItemEntity item1 = newItem(app, containerA, "1"); + assertItemsEventually(item1); + + Entities.unmanage(containerA); + assertItemsEventually(); + } + + private void assertItemsEventually(final MockItemEntity... expected) { + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + public void run() { + assertEquals(ImmutableSet.copyOf(itemGroup.getMembers()), ImmutableSet.copyOf(expected)); + }}); + } + + private MockContainerEntity newContainer(TestApplication app, String name, String membership) { + MockContainerEntity container = app.createAndManageChild(EntitySpec.create(MockContainerEntity.class) + .displayName(name) + .configure(MockContainerEntity.MOCK_MEMBERSHIP, membership)); + container.start(ImmutableList.of(loc)); + return container; + } + + private static MockItemEntity newItem(TestApplication app, MockContainerEntity container, String name) { + MockItemEntity item = app.createAndManageChild(EntitySpec.create(MockItemEntity.class) + .displayName(name)); + item.move(container); + return item; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingModelTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingModelTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingModelTest.java new file mode 100644 index 0000000..d96f509 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingModelTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import static org.testng.Assert.assertEquals; + +import java.util.Collections; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class LoadBalancingModelTest { + + private static final double PRECISION = 0.00001; + + private MockContainerEntity container1 = new MockContainerEntityImpl(); + private MockContainerEntity container2 = new MockContainerEntityImpl(); + private MockItemEntity item1 = new MockItemEntityImpl(); + private MockItemEntity item2 = new MockItemEntityImpl(); + private MockItemEntity item3 = new MockItemEntityImpl(); + + private DefaultBalanceablePoolModel<MockContainerEntity, MockItemEntity> model; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + model = new DefaultBalanceablePoolModel<MockContainerEntity, MockItemEntity>("myname"); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + // nothing to tear down; no management context created + } + + @Test + public void testPoolRatesCorrectlySumContainers() throws Exception { + model.onContainerAdded(container1, 10d, 20d); + model.onContainerAdded(container2, 11d, 22d); + + assertEquals(model.getPoolLowThreshold(), 10d+11d, PRECISION); + assertEquals(model.getPoolHighThreshold(), 20d+22d, PRECISION); + } + + @Test + public void testPoolRatesCorrectlySumItems() throws Exception { + model.onContainerAdded(container1, 10d, 20d); + model.onItemAdded(item1, container1, true); + model.onItemAdded(item2, container1, true); + + model.onItemWorkrateUpdated(item1, 1d); + assertEquals(model.getCurrentPoolWorkrate(), 1d, PRECISION); + + model.onItemWorkrateUpdated(item2, 2d); + assertEquals(model.getCurrentPoolWorkrate(), 1d+2d, PRECISION); + + model.onItemWorkrateUpdated(item2, 4d); + assertEquals(model.getCurrentPoolWorkrate(), 1d+4d, PRECISION); + + model.onItemRemoved(item1); + assertEquals(model.getCurrentPoolWorkrate(), 4d, PRECISION); + } + + @Test + public void testWorkrateUpdateAfterItemRemovalIsNotRecorded() throws Exception { + model.onContainerAdded(container1, 10d, 20d); + model.onItemAdded(item1, container1, true); + model.onItemRemoved(item1); + model.onItemWorkrateUpdated(item1, 123d); + + assertEquals(model.getCurrentPoolWorkrate(), 0d, PRECISION); + assertEquals(model.getContainerWorkrates().get(container1), 0d, PRECISION); + assertEquals(model.getItemWorkrate(item1), null); + } + + @Test + public void testItemMovedWillUpdateContainerWorkrates() throws Exception { + model.onContainerAdded(container1, 10d, 20d); + model.onContainerAdded(container2, 11d, 21d); + model.onItemAdded(item1, container1, false); + model.onItemWorkrateUpdated(item1, 123d); + + model.onItemMoved(item1, container2); + + assertEquals(model.getItemsForContainer(container1), Collections.emptySet()); + assertEquals(model.getItemsForContainer(container2), ImmutableSet.of(item1)); + assertEquals(model.getItemWorkrate(item1), 123d); + assertEquals(model.getTotalWorkrate(container1), 0d); + assertEquals(model.getTotalWorkrate(container2), 123d); + assertEquals(model.getItemWorkrates(container1), Collections.emptyMap()); + assertEquals(model.getItemWorkrates(container2), ImmutableMap.of(item1, 123d)); + assertEquals(model.getContainerWorkrates(), ImmutableMap.of(container1, 0d, container2, 123d)); + assertEquals(model.getCurrentPoolWorkrate(), 123d); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyConcurrencyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyConcurrencyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyConcurrencyTest.java new file mode 100644 index 0000000..5a3b328 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyConcurrencyTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.test.entity.TestApplication; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; + +import com.google.common.collect.Lists; + +public class LoadBalancingPolicyConcurrencyTest extends AbstractLoadBalancingPolicyTest { + + private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicyConcurrencyTest.class); + + private static final double WORKRATE_JITTER = 2d; + private static final int NUM_CONTAINERS = 20; + private static final int WORKRATE_UPDATE_PERIOD_MS = 1000; + + private ScheduledExecutorService scheduledExecutor; + + @BeforeMethod(alwaysRun=true) + @Override + public void before() { + scheduledExecutor = Executors.newScheduledThreadPool(10); + super.before(); + } + + @AfterMethod(alwaysRun=true) + @Override + public void after() { + if (scheduledExecutor != null) scheduledExecutor.shutdownNow(); + super.after(); + } + + @Test + public void testSimplePeriodicWorkrateUpdates() { + List<MockItemEntity> items = Lists.newArrayList(); + List<MockContainerEntity> containers = Lists.newArrayList(); + + for (int i = 0; i < NUM_CONTAINERS; i++) { + containers.add(newContainer(app, "container"+i, 10, 30)); + } + for (int i = 0; i < NUM_CONTAINERS; i++) { + newItemWithPeriodicWorkrates(app, containers.get(0), "item"+i, 20); + } + + assertWorkratesEventually(containers, items, Collections.nCopies(NUM_CONTAINERS, 20d), WORKRATE_JITTER); + } + + @Test + public void testConcurrentlyAddContainers() { + final Queue<MockContainerEntity> containers = new ConcurrentLinkedQueue<MockContainerEntity>(); + final List<MockItemEntity> items = Lists.newArrayList(); + + containers.add(newContainer(app, "container-orig", 10, 30)); + + for (int i = 0; i < NUM_CONTAINERS; i++) { + items.add(newItemWithPeriodicWorkrates(app, containers.iterator().next(), "item"+i, 20)); + } + for (int i = 0; i < NUM_CONTAINERS-1; i++) { + final int index = i; + scheduledExecutor.submit(new Callable<Void>() { + @Override public Void call() { + containers.add(newContainer(app, "container"+index, 10, 30)); + return null; + }}); + } + + assertWorkratesEventually(containers, items, Collections.nCopies(NUM_CONTAINERS, 20d), WORKRATE_JITTER); + } + + @Test + public void testConcurrentlyAddItems() { + final Queue<MockItemEntity> items = new ConcurrentLinkedQueue<MockItemEntity>(); + final List<MockContainerEntity> containers = Lists.newArrayList(); + + for (int i = 0; i < NUM_CONTAINERS; i++) { + containers.add(newContainer(app, "container"+i, 10, 30)); + } + for (int i = 0; i < NUM_CONTAINERS; i++) { + final int index = i; + scheduledExecutor.submit(new Callable<Void>() { + @Override public Void call() { + items.add(newItemWithPeriodicWorkrates(app, containers.get(0), "item"+index, 20)); + return null; + }}); + } + assertWorkratesEventually(containers, items, Collections.nCopies(NUM_CONTAINERS, 20d), WORKRATE_JITTER); + } + + // TODO Got IndexOutOfBoundsException from containers.last() + @Test(groups="WIP", invocationCount=100) + public void testConcurrentlyRemoveContainers() { + List<MockItemEntity> items = Lists.newArrayList(); + final List<MockContainerEntity> containers = Lists.newArrayList(); + + for (int i = 0; i < NUM_CONTAINERS; i++) { + containers.add(newContainer(app, "container"+i, 15, 45)); + } + for (int i = 0; i < NUM_CONTAINERS; i++) { + items.add(newItemWithPeriodicWorkrates(app, containers.get(i), "item"+i, 20)); + } + + final List<MockContainerEntity> containersToStop = Lists.newArrayList(); + for (int i = 0; i < NUM_CONTAINERS/2; i++) { + containersToStop.add(containers.remove(0)); + } + for (final MockContainerEntity containerToStop : containersToStop) { + scheduledExecutor.submit(new Callable<Void>() { + @Override public Void call() { + try { + containerToStop.offloadAndStop(containers.get(containers.size()-1)); + Entities.unmanage(containerToStop); + } catch (Throwable t) { + LOG.error("Error stopping container "+containerToStop, t); + } + return null; + }}); + } + + assertWorkratesEventually(containers, items, Collections.nCopies((int)(NUM_CONTAINERS/2), 40d), WORKRATE_JITTER*2); + } + + @Test(groups="WIP") + public void testConcurrentlyRemoveItems() { + List<MockItemEntity> items = Lists.newArrayList(); + List<MockContainerEntity> containers = Lists.newArrayList(); + + for (int i = 0; i < NUM_CONTAINERS; i++) { + containers.add(newContainer(app, "container"+i, 15, 45)); + } + for (int i = 0; i < NUM_CONTAINERS*2; i++) { + items.add(newItemWithPeriodicWorkrates(app, containers.get(i%NUM_CONTAINERS), "item"+i, 20)); + } + // should now have item0 and item{0+NUM_CONTAINERS} on container0, etc + + for (int i = 0; i < NUM_CONTAINERS; i++) { + // not removing consecutive items as that would leave it balanced! + int indexToStop = (i < NUM_CONTAINERS/2) ? NUM_CONTAINERS : 0; + final MockItemEntity itemToStop = items.remove(indexToStop); + scheduledExecutor.submit(new Callable<Void>() { + @Override public Void call() { + try { + itemToStop.stop(); + Entities.unmanage(itemToStop); + } catch (Throwable t) { + LOG.error("Error stopping item "+itemToStop, t); + } + return null; + }}); + } + + assertWorkratesEventually(containers, items, Collections.nCopies(NUM_CONTAINERS, 20d), WORKRATE_JITTER); + } + + protected MockItemEntity newItemWithPeriodicWorkrates(TestApplication app, MockContainerEntity container, String name, double workrate) { + MockItemEntity item = newItem(app, container, name, workrate); + scheduleItemWorkrateUpdates(item, workrate, WORKRATE_JITTER); + return item; + } + + private void scheduleItemWorkrateUpdates(final MockItemEntity item, final double workrate, final double jitter) { + final AtomicReference<Future<?>> futureRef = new AtomicReference<Future<?>>(); + Future<?> future = scheduledExecutor.scheduleAtFixedRate( + new Runnable() { + @Override public void run() { + if (item.isStopped() && futureRef.get() != null) { + futureRef.get().cancel(true); + return; + } + double jitteredWorkrate = workrate + (random.nextDouble()*jitter*2 - jitter); + ((EntityLocal)item).setAttribute(TEST_METRIC, (int) Math.max(0, jitteredWorkrate)); + } + }, + 0, WORKRATE_UPDATE_PERIOD_MS, TimeUnit.MILLISECONDS); + futureRef.set(future); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicySoakTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicySoakTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicySoakTest.java new file mode 100644 index 0000000..8da494c --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicySoakTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +public class LoadBalancingPolicySoakTest extends AbstractLoadBalancingPolicyTest { + + private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicySoakTest.class); + + private static final long TIMEOUT_MS = 40*1000; + + @Test + public void testLoadBalancingQuickTest() { + RunConfig config = new RunConfig(); + config.numCycles = 1; + config.numContainers = 5; + config.numItems = 5; + config.lowThreshold = 200; + config.highThreshold = 300; + config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold)); + + runLoadBalancingSoakTest(config); + } + + @Test + public void testLoadBalancingManyItemsQuickTest() { + RunConfig config = new RunConfig(); + config.numCycles = 1; + config.numContainers = 5; + config.numItems = 30; + config.lowThreshold = 200; + config.highThreshold = 300; + config.numContainerStopsPerCycle = 1; + config.numItemStopsPerCycle = 1; + config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold)); + + runLoadBalancingSoakTest(config); + } + + @Test(groups={"Integration","Acceptance"}) // acceptance group, because it's slow to run many cycles + public void testLoadBalancingSoakTest() { + RunConfig config = new RunConfig(); + config.numCycles = 100; + config.numContainers = 5; + config.numItems = 5; + config.lowThreshold = 200; + config.highThreshold = 300; + config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold)); + + runLoadBalancingSoakTest(config); + } + + @Test(groups={"Integration","Acceptance"}) // acceptance group, because it's slow to run many cycles + public void testLoadBalancingManyItemsSoakTest() { + RunConfig config = new RunConfig(); + config.numCycles = 100; + config.numContainers = 5; + config.numItems = 30; + config.lowThreshold = 200; + config.highThreshold = 300; + config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold)); + config.numContainerStopsPerCycle = 3; + config.numItemStopsPerCycle = 10; + + runLoadBalancingSoakTest(config); + } + + @Test(groups={"Integration","Acceptance"}) + public void testLoadBalancingManyManyItemsTest() { + RunConfig config = new RunConfig(); + config.numCycles = 1; + config.numContainers = 5; + config.numItems = 1000; + config.lowThreshold = 2000; + config.highThreshold = 3000; + config.numContainerStopsPerCycle = 0; + config.numItemStopsPerCycle = 0; + config.totalRate = (int) (config.numContainers*(0.95*config.highThreshold)); + config.verbose = false; + + runLoadBalancingSoakTest(config); + } + + private void runLoadBalancingSoakTest(RunConfig config) { + final int numCycles = config.numCycles; + final int numContainers = config.numContainers; + final int numItems = config.numItems; + final double lowThreshold = config.lowThreshold; + final double highThreshold = config.highThreshold; + final int totalRate = config.totalRate; + final int numContainerStopsPerCycle = config.numContainerStopsPerCycle; + final int numItemStopsPerCycle = config.numItemStopsPerCycle; + final boolean verbose = config.verbose; + + MockItemEntityImpl.totalMoveCount.set(0); + + final List<MockContainerEntity> containers = new ArrayList<MockContainerEntity>(); + final List<MockItemEntity> items = new ArrayList<MockItemEntity>(); + + for (int i = 1; i <= numContainers; i++) { + MockContainerEntity container = newContainer(app, "container-"+i, lowThreshold, highThreshold); + containers.add(container); + } + for (int i = 1; i <= numItems; i++) { + MockItemEntity item = newItem(app, containers.get(0), "item-"+i, 5); + items.add(item); + } + + for (int i = 1; i <= numCycles; i++) { + LOG.info(LoadBalancingPolicySoakTest.class.getSimpleName()+": cycle "+i); + + // Stop items, and start others + for (int j = 1; j <= numItemStopsPerCycle; j++) { + int itemIndex = random.nextInt(numItems); + MockItemEntity itemToStop = items.get(itemIndex); + itemToStop.stop(); + LOG.debug("Unmanaging item {}", itemToStop); + Entities.unmanage(itemToStop); + items.set(itemIndex, newItem(app, containers.get(0), "item-"+(itemIndex+1)+"."+i+"."+j, 5)); + } + + // Repartition the load across the items + final List<Integer> itemRates = randomlyDivideLoad(numItems, totalRate, 0, (int)highThreshold); + + for (int j = 0; j < numItems; j++) { + MockItemEntity item = items.get(j); + ((EntityLocal)item).setAttribute(MockItemEntity.TEST_METRIC, itemRates.get(j)); + } + + // Stop containers, and start others + for (int j = 1; j <= numContainerStopsPerCycle; j++) { + int containerIndex = random.nextInt(numContainers); + MockContainerEntity containerToStop = containers.get(containerIndex); + containerToStop.offloadAndStop(containers.get((containerIndex+1)%numContainers)); + LOG.debug("Unmanaging container {}", containerToStop); + Entities.unmanage(containerToStop); + + MockContainerEntity containerToAdd = newContainer(app, "container-"+(containerIndex+1)+"."+i+"."+j, lowThreshold, highThreshold); + containers.set(containerIndex, containerToAdd); + } + + // Assert that the items become balanced again + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + @Override public void run() { + Iterable<Double> containerRates = Iterables.transform(containers, new Function<MockContainerEntity, Double>() { + @Override public Double apply(MockContainerEntity input) { + return (double) input.getWorkrate(); + }}); + + String errMsg; + if (verbose) { + errMsg = verboseDumpToString(containers, items)+"; itemRates="+itemRates; + } else { + errMsg = containerRates+"; totalMoves="+MockItemEntityImpl.totalMoveCount; + } + + // Check that haven't lost any items + // (as observed in one jenkins build failure: 2014-03-18; but that could also be + // explained by errMsg generated in the middle of a move) + List<Entity> itemsFromModel = Lists.newArrayList(); + List<Entity> itemsFromContainers = Lists.newArrayList(); + for (Entity container : model.getPoolContents()) { + itemsFromModel.addAll(model.getItemsForContainer(container)); + } + for (MockContainerEntity container : containers) { + itemsFromContainers.addAll(container.getBalanceableItems()); + } + Asserts.assertEqualsIgnoringOrder(itemsFromModel, items, true, errMsg); + Asserts.assertEqualsIgnoringOrder(itemsFromContainers, items, true, errMsg); + + // Check overall container rates are balanced + assertEquals(sum(containerRates), sum(itemRates), errMsg); + for (double containerRate : containerRates) { + assertTrue(containerRate >= lowThreshold, errMsg); + assertTrue(containerRate <= highThreshold, errMsg); + } + }}); + } + } + + private static class RunConfig { + int numCycles = 1; + int numContainers = 5; + int numItems = 5; + double lowThreshold = 200; + double highThreshold = 300; + int totalRate = (int) (5*(0.95*highThreshold)); + int numContainerStopsPerCycle = 1; + int numItemStopsPerCycle = 1; + boolean verbose = true; + } + + // Testing conveniences. + + private double sum(Iterable<? extends Number> vals) { + double total = 0;; + for (Number val : vals) { + total += val.doubleValue(); + } + return total; + } + + /** + * Distributes a given load across a number of items randomly. The variability in load for an item is dictated by the variance, + * but the total will always equal totalLoad. + * + * The distribution of load is skewed: one side of the list will have bigger values than the other. + * Which side is skewed will vary, so when balancing a policy will find that things have entirely changed. + * + * TODO This is not particularly good at distributing load, but it's random and skewed enough to force rebalancing. + */ + private List<Integer> randomlyDivideLoad(int numItems, int totalLoad, int min, int max) { + List<Integer> result = new ArrayList<Integer>(numItems); + int totalRemaining = totalLoad; + int variance = 3; + int skew = 3; + + for (int i = 0; i < numItems; i++) { + int itemsRemaining = numItems-i; + int itemFairShare = (totalRemaining/itemsRemaining); + double skewFactor = ((double)i/numItems)*2 - 1; // a number between -1 and 1, depending how far through the item set we are + int itemSkew = (int) (random.nextInt(skew)*skewFactor); + int itemLoad = itemFairShare + (random.nextInt(variance*2)-variance) + itemSkew; + itemLoad = Math.max(min, itemLoad); + itemLoad = Math.min(totalRemaining, itemLoad); + itemLoad = Math.min(max, itemLoad); + result.add(itemLoad); + totalRemaining -= itemLoad; + } + + if (random.nextBoolean()) Collections.reverse(result); + + assertTrue(sum(result) <= totalLoad, "totalLoad="+totalLoad+"; result="+result); + + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyTest.java new file mode 100644 index 0000000..a0166f9 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicyTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; + +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class LoadBalancingPolicyTest extends AbstractLoadBalancingPolicyTest { + + // Expect no balancing to occur as container A isn't above the high threshold. + @Test + public void testNoopWhenWithinThresholds() { + MockContainerEntity containerA = newContainer(app, "A", 10, 100); + MockContainerEntity containerB = newContainer(app, "B", 20, 60); + MockItemEntity item1 = newItem(app, containerA, "1", 10); + MockItemEntity item2 = newItem(app, containerA, "2", 10); + MockItemEntity item3 = newItem(app, containerA, "3", 10); + MockItemEntity item4 = newItem(app, containerA, "4", 10); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4), + ImmutableList.of(40d, 0d)); + } + + @Test + public void testNoopWhenAlreadyBalanced() { + MockContainerEntity containerA = newContainer(app, "A", 20, 80); + MockContainerEntity containerB = newContainer(app, "B", 20, 80); + MockItemEntity item1 = newItem(app, containerA, "1", 10); + MockItemEntity item2 = newItem(app, containerA, "2", 30); + MockItemEntity item3 = newItem(app, containerB, "3", 20); + MockItemEntity item4 = newItem(app, containerB, "4", 20); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4), + ImmutableList.of(40d, 40d)); + assertEquals(containerA.getBalanceableItems(), ImmutableSet.of(item1, item2)); + assertEquals(containerB.getBalanceableItems(), ImmutableSet.of(item3, item4)); + } + + // Expect 20 units of workload to be migrated from hot container (A) to cold (B). + @Test + public void testSimpleBalancing() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 25); + MockContainerEntity containerB = newContainer(app, "B", 20, 60); + MockItemEntity item1 = newItem(app, containerA, "1", 10); + MockItemEntity item2 = newItem(app, containerA, "2", 10); + MockItemEntity item3 = newItem(app, containerA, "3", 10); + MockItemEntity item4 = newItem(app, containerA, "4", 10); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4), + ImmutableList.of(20d, 20d)); + } + + @Test + public void testSimpleBalancing2() { + MockContainerEntity containerA = newContainer(app, "A", 20, 40); + MockContainerEntity containerB = newContainer(app, "B", 20, 40); + MockItemEntity item1 = newItem(app, containerA, "1", 0); + MockItemEntity item2 = newItem(app, containerB, "2", 40); + MockItemEntity item3 = newItem(app, containerB, "3", 20); + MockItemEntity item4 = newItem(app, containerB, "4", 20); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4), + ImmutableList.of(40d, 40d)); + } + +// @Test +// public void testAdjustedItemNotMoved() { +// MockBalancingModel pool = new MockBalancingModel( +// containers( +// containerA, 20, 50, +// containerB, 20, 50), +// items( +// "item1", containerA, 0, +// "item2", containerB, -40, +// "item3", containerB, 20, +// "item4", containerB, 20) +// ); +// +// BalancingStrategy<String, String> policy = new BalancingStrategy<String, String>("Test", pool); +// policy.rebalance(); +// +// assertEquals((Object)pool.getItemsForContainer(containerA), ImmutableSet.of("item1", "item3", "item4"), pool.itemDistributionToString()); +// assertEquals((Object)pool.getItemsForContainer(containerB), ImmutableSet.of("item2"), pool.itemDistributionToString()); +// } + + @Test + public void testMultiMoveBalancing() { + MockContainerEntity containerA = newContainer(app, "A", 20, 50); + MockContainerEntity containerB = newContainer(app, "B", 20, 50); + MockItemEntity item1 = newItem(app, containerA, "1", 10); + MockItemEntity item2 = newItem(app, containerA, "2", 10); + MockItemEntity item3 = newItem(app, containerA, "3", 10); + MockItemEntity item4 = newItem(app, containerA, "4", 10); + MockItemEntity item5 = newItem(app, containerA, "5", 10); + MockItemEntity item6 = newItem(app, containerA, "6", 10); + MockItemEntity item7 = newItem(app, containerA, "7", 10); + MockItemEntity item8 = newItem(app, containerA, "8", 10); + MockItemEntity item9 = newItem(app, containerA, "9", 10); + MockItemEntity item10 = newItem(app, containerA, "10", 10); + + // non-deterministic which items will be moved; but can assert how many (given they all have same workrate) + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4, item5, item6, item7, item8, item9, item10), + ImmutableList.of(50d, 50d)); + assertEquals(containerA.getBalanceableItems().size(), 5); + assertEquals(containerB.getBalanceableItems().size(), 5); + } + + @Test + public void testRebalanceWhenWorkratesChange() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 50); + MockContainerEntity containerB = newContainer(app, "B", 10, 50); + MockItemEntity item1 = newItem(app, containerA, "1", 0); + MockItemEntity item2 = newItem(app, containerA, "2", 0); + + ((EntityLocal)item1).setAttribute(MockItemEntity.TEST_METRIC, 40); + ((EntityLocal)item2).setAttribute(MockItemEntity.TEST_METRIC, 40); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2), + ImmutableList.of(40d, 40d)); + } + + // Expect no balancing to occur in hot pool (2 containers over-threshold at 40). + // On addition of new container, expect hot containers to offload 10 each. + @Test + public void testAddContainerWhenHot() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 30); + MockContainerEntity containerB = newContainer(app, "B", 10, 30); + MockItemEntity item1 = newItem(app, containerA, "1", 10); + MockItemEntity item2 = newItem(app, containerA, "2", 10); + MockItemEntity item3 = newItem(app, containerA, "3", 10); + MockItemEntity item4 = newItem(app, containerA, "4", 10); + MockItemEntity item5 = newItem(app, containerB, "5", 10); + MockItemEntity item6 = newItem(app, containerB, "6", 10); + MockItemEntity item7 = newItem(app, containerB, "7", 10); + MockItemEntity item8 = newItem(app, containerB, "8", 10); + // Both containers are over-threshold at this point; should not rebalance. + + MockContainerEntity containerC = newAsyncContainer(app, "C", 10, 30, CONTAINER_STARTUP_DELAY_MS); + // New container allows hot ones to offload work. + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB, containerC), + ImmutableList.of(item1, item2, item3, item4, item5, item6, item7, item8), + ImmutableList.of(30d, 30d, 20d)); + } + + // On addition of new container, expect no rebalancing to occur as no existing container is hot. + @Test + public void testAddContainerWhenCold() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 50); + MockContainerEntity containerB = newContainer(app, "B", 10, 50); + MockItemEntity item1 = newItem(app, containerA, "1", 10); + MockItemEntity item2 = newItem(app, containerA, "2", 10); + MockItemEntity item3 = newItem(app, containerA, "3", 10); + MockItemEntity item4 = newItem(app, containerA, "4", 10); + MockItemEntity item5 = newItem(app, containerB, "5", 10); + MockItemEntity item6 = newItem(app, containerB, "6", 10); + MockItemEntity item7 = newItem(app, containerB, "7", 10); + MockItemEntity item8 = newItem(app, containerB, "8", 10); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4, item5, item6, item7, item8), + ImmutableList.of(40d, 40d)); + + MockContainerEntity containerC = newAsyncContainer(app, "C", 10, 50, CONTAINER_STARTUP_DELAY_MS); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB, containerC), + ImmutableList.of(item1, item2, item3, item4, item5, item6, item7, item8), + ImmutableList.of(40d, 40d, 0d)); + } + + // Expect no balancing to occur in cool pool (2 containers under-threshold at 30). + // On addition of new item, expect over-threshold container (A) to offload 20 to B. + @Test + public void testAddItem() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 50); + MockContainerEntity containerB = newContainer(app, "B", 10, 50); + MockItemEntity item1 = newItem(app, containerA, "1", 10); + MockItemEntity item2 = newItem(app, containerA, "2", 10); + MockItemEntity item3 = newItem(app, containerA, "3", 10); + MockItemEntity item4 = newItem(app, containerB, "4", 10); + MockItemEntity item5 = newItem(app, containerB, "5", 10); + MockItemEntity item6 = newItem(app, containerB, "6", 10); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4, item5, item6), + ImmutableList.of(30d, 30d)); + + newItem(app, containerA, "7", 40); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4, item5, item6), + ImmutableList.of(50d, 50d)); + } + + // FIXME Failed in build repeatedly (e.g. #1035), but couldn't reproduce locally yet with invocationCount=100 + @Test(groups="WIP") + public void testRemoveContainerCausesRebalancing() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 30); + MockContainerEntity containerB = newContainer(app, "B", 10, 30); + MockContainerEntity containerC = newContainer(app, "C", 10, 30); + MockItemEntity item1 = newItem(app, containerA, "1", 10); + MockItemEntity item2 = newItem(app, containerA, "2", 10); + MockItemEntity item3 = newItem(app, containerB, "3", 10); + MockItemEntity item4 = newItem(app, containerB, "4", 10); + MockItemEntity item5 = newItem(app, containerC, "5", 10); + MockItemEntity item6 = newItem(app, containerC, "6", 10); + + Entities.unmanage(containerC); + item5.move(containerA); + item6.move(containerA); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4, item5, item6), + ImmutableList.of(30d, 30d)); + } + + @Test + public void testRemoveItemCausesRebalancing() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 30); + MockContainerEntity containerB = newContainer(app, "B", 10, 30); + MockItemEntity item1 = newItem(app, containerA, "1", 30); + MockItemEntity item2 = newItem(app, containerB, "2", 20); + MockItemEntity item3 = newItem(app, containerB, "3", 20); + + item1.stop(); + Entities.unmanage(item1); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3), + ImmutableList.of(20d, 20d)); + } + + @Test + public void testRebalancesAfterManualMove() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 50); + MockContainerEntity containerB = newContainer(app, "B", 10, 50); + MockItemEntity item1 = newItem(app, containerA, "1", 20); + MockItemEntity item2 = newItem(app, containerA, "2", 20); + MockItemEntity item3 = newItem(app, containerB, "3", 20); + MockItemEntity item4 = newItem(app, containerB, "4", 20); + + // Move everything onto containerA, and expect it to be automatically re-balanced + item3.move(containerA); + item4.move(containerA); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3, item4), + ImmutableList.of(40d, 40d)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testModelIncludesItemsAndContainersStartedBeforePolicyCreated() { + pool.removePolicy(policy); + policy.destroy(); + + // Set-up containers and items. + final MockContainerEntity containerA = newContainer(app, "A", 10, 100); + newItem(app, containerA, "1", 10); + + policy = new LoadBalancingPolicy(MutableMap.of(), TEST_METRIC, model); + pool.addPolicy(policy); + + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + public void run() { + assertEquals(model.getContainerWorkrates(), ImmutableMap.of(containerA, 10d)); + } + }); + } + + @Test + public void testLockedItemsNotMoved() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 50); + MockContainerEntity containerB = newContainer(app, "B", 10, 50); + MockItemEntity item1 = newLockedItem(app, containerA, "1", 40); + MockItemEntity item2 = newLockedItem(app, containerA, "2", 40); + + assertWorkratesContinually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2), + ImmutableList.of(80d, 0d)); + } + + @Test + public void testLockedItemsContributeToOverloadedMeasurements() { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 50); + MockContainerEntity containerB = newContainer(app, "B", 10, 50); + MockItemEntity item1 = newLockedItem(app, containerA, "1", 40); + MockItemEntity item2 = newItem(app, containerA, "2", 25); + MockItemEntity item3 = newItem(app, containerA, "3", 25); + + assertWorkratesEventually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3), + ImmutableList.of(40d, 50d)); + } + + @Test + public void testOverloadedLockedItemsPreventMoreWorkEnteringContainer() throws Exception { + // Set-up containers and items. + MockContainerEntity containerA = newContainer(app, "A", 10, 50); + MockContainerEntity containerB = newContainer(app, "B", 10, 50); + MockItemEntity item1 = newLockedItem(app, containerA, "1", 50); + Thread.sleep(1); // increase chances of item1's workrate having been received first + MockItemEntity item2 = newItem(app, containerB, "2", 30); + MockItemEntity item3 = newItem(app, containerB, "3", 30); + + assertWorkratesContinually( + ImmutableList.of(containerA, containerB), + ImmutableList.of(item1, item2, item3), + ImmutableList.of(50d, 60d)); + } + + @Test + public void testPolicyUpdatesModel() { + final MockContainerEntity containerA = newContainer(app, "A", 10, 20); + final MockContainerEntity containerB = newContainer(app, "B", 11, 21); + final MockItemEntity item1 = newItem(app, containerA, "1", 12); + final MockItemEntity item2 = newItem(app, containerB, "2", 13); + + Asserts.succeedsEventually(MutableMap.of("timeout", TIMEOUT_MS), new Runnable() { + public void run() { + assertEquals(model.getPoolSize(), 2); + assertEquals(model.getPoolContents(), ImmutableSet.of(containerA, containerB)); + assertEquals(model.getItemWorkrate(item1), 12d); + assertEquals(model.getItemWorkrate(item2), 13d); + + assertEquals(model.getParentContainer(item1), containerA); + assertEquals(model.getParentContainer(item2), containerB); + assertEquals(model.getContainerWorkrates(), ImmutableMap.of(containerA, 12d, containerB, 13d)); + + assertEquals(model.getPoolLowThreshold(), 10+11d); + assertEquals(model.getPoolHighThreshold(), 20+21d); + assertEquals(model.getCurrentPoolWorkrate(), 12+13d); + assertFalse(model.isHot()); + assertFalse(model.isCold()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntity.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntity.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntity.java new file mode 100644 index 0000000..1937d9a --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntity.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import java.util.Map; + +import org.apache.brooklyn.api.entity.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.annotation.EffectorParam; +import brooklyn.entity.basic.AbstractGroup; +import brooklyn.entity.basic.MethodEffector; +import brooklyn.entity.trait.Startable; +import brooklyn.event.basic.BasicConfigKey; + +@ImplementedBy(MockContainerEntityImpl.class) +public interface MockContainerEntity extends AbstractGroup, BalanceableContainer<Movable>, Startable { + + @SetFromFlag("membership") + public static final ConfigKey<String> MOCK_MEMBERSHIP = new BasicConfigKey<String>( + String.class, "mock.container.membership", "For testing ItemsInContainersGroup"); + + @SetFromFlag("delay") + public static final ConfigKey<Long> DELAY = new BasicConfigKey<Long>( + Long.class, "mock.container.delay", "", 0L); + + public static final Effector<Void> OFFLOAD_AND_STOP = new MethodEffector<Void>(MockContainerEntity.class, "offloadAndStop"); + + public void lock(); + + public void unlock(); + + public int getWorkrate(); + + public Map<Entity, Double> getItemUsage(); + + public void addItem(Entity item); + + public void removeItem(Entity item); + + public void offloadAndStop(@EffectorParam(name="otherContianer") MockContainerEntity otherContainer); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntityImpl.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntityImpl.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntityImpl.java new file mode 100644 index 0000000..de9da37 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockContainerEntityImpl.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.api.location.Location; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.AbstractGroupImpl; +import brooklyn.entity.basic.Attributes; +import brooklyn.util.collections.MutableList; +import brooklyn.util.time.Time; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + + +public class MockContainerEntityImpl extends AbstractGroupImpl implements MockContainerEntity { + + private static final Logger LOG = LoggerFactory.getLogger(MockContainerEntity.class); + + volatile boolean offloading; + volatile boolean running; + + ReentrantLock _lock = new ReentrantLock(); + + @Override + public <T> T setAttribute(AttributeSensor<T> attribute, T val) { + if (LOG.isDebugEnabled()) LOG.debug("Mocks: container {} setting {} to {}", new Object[] {this, attribute, val}); + return super.setAttribute(attribute, val); + } + + @Override + public void lock() { + _lock.lock(); + if (!running) { + _lock.unlock(); + throw new IllegalStateException("Container lock "+this+"; it is not running"); + } + } + + @Override + public void unlock() { + _lock.unlock(); + } + + @Override + public int getWorkrate() { + int result = 0; + for (Entity member : getMembers()) { + Integer memberMetric = member.getAttribute(MockItemEntity.TEST_METRIC); + result += ((memberMetric != null) ? memberMetric : 0); + } + return result; + } + + @Override + public Map<Entity, Double> getItemUsage() { + Map<Entity, Double> result = Maps.newLinkedHashMap(); + for (Entity member : getMembers()) { + Map<Entity, Double> memberItemUsage = member.getAttribute(MockItemEntity.ITEM_USAGE_METRIC); + if (memberItemUsage != null) { + for (Map.Entry<Entity, Double> entry : memberItemUsage.entrySet()) { + double val = (result.containsKey(entry.getKey()) ? result.get(entry.getKey()) : 0d); + val += ((entry.getValue() != null) ? entry.getValue() : 0); + result.put(entry.getKey(), val); + } + } + } + return result; + } + + @Override + public void addItem(Entity item) { + if (LOG.isDebugEnabled()) LOG.debug("Mocks: adding item {} to container {}", item, this); + if (!running || offloading) throw new IllegalStateException("Container "+getDisplayName()+" is not running; cannot add item "+item); + addMember(item); + emit(ITEM_ADDED, item); + } + + @Override + public void removeItem(Entity item) { + if (LOG.isDebugEnabled()) LOG.debug("Mocks: removing item {} from container {}", item, this); + if (!running) throw new IllegalStateException("Container "+getDisplayName()+" is not running; cannot remove item "+item); + removeMember(item); + emit(ITEM_REMOVED, item); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public Set<Movable> getBalanceableItems() { + return (Set) Sets.newLinkedHashSet(getMembers()); + } + + public String toString() { + return "MockContainer["+getDisplayName()+"]"; + } + + private long getDelay() { + return getConfig(DELAY); + } + + @Override + public void start(Collection<? extends Location> locs) { + if (LOG.isDebugEnabled()) LOG.debug("Mocks: starting container {}", this); + _lock.lock(); + try { + Time.sleep(getDelay()); + running = true; + addLocations(locs); + emit(Attributes.LOCATION_CHANGED, null); + setAttribute(SERVICE_UP, true); + } finally { + _lock.unlock(); + } + } + + @Override + public void stop() { + if (LOG.isDebugEnabled()) LOG.debug("Mocks: stopping container {}", this); + _lock.lock(); + try { + running = false; + Time.sleep(getDelay()); + setAttribute(SERVICE_UP, false); + } finally { + _lock.unlock(); + } + } + + private void stopWithoutLock() { + running = false; + Time.sleep(getDelay()); + setAttribute(SERVICE_UP, false); + } + + public void offloadAndStop(final MockContainerEntity otherContainer) { + if (LOG.isDebugEnabled()) LOG.debug("Mocks: offloading container {} to {} (items {})", new Object[] {this, otherContainer, getBalanceableItems()}); + runWithLock(ImmutableList.of(this, otherContainer), new Runnable() { + public void run() { + offloading = false; + for (Movable item : getBalanceableItems()) { + ((MockItemEntity)item).moveNonEffector(otherContainer); + } + if (LOG.isDebugEnabled()) LOG.debug("Mocks: stopping offloaded container {}", this); + stopWithoutLock(); + }}); + } + + @Override + public void restart() { + if (LOG.isDebugEnabled()) LOG.debug("Mocks: restarting {}", this); + throw new UnsupportedOperationException(); + } + + public static void runWithLock(List<MockContainerEntity> entitiesToLock, Runnable r) { + List<MockContainerEntity> entitiesToLockCopy = MutableList.copyOf(Iterables.filter(entitiesToLock, Predicates.notNull())); + List<MockContainerEntity> entitiesLocked = Lists.newArrayList(); + Collections.sort(entitiesToLockCopy, new Comparator<MockContainerEntity>() { + public int compare(MockContainerEntity o1, MockContainerEntity o2) { + return o1.getId().compareTo(o2.getId()); + }}); + + try { + for (MockContainerEntity it : entitiesToLockCopy) { + it.lock(); + entitiesLocked.add(it); + } + + r.run(); + + } finally { + for (MockContainerEntity it : entitiesLocked) { + it.unlock(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockItemEntity.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockItemEntity.java b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockItemEntity.java new file mode 100644 index 0000000..83a54c0 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/loadbalancing/MockItemEntity.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import java.util.Map; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.event.AttributeSensor; + +import brooklyn.event.basic.Sensors; + +import com.google.common.reflect.TypeToken; + +@ImplementedBy(MockItemEntityImpl.class) +public interface MockItemEntity extends Entity, Movable { + + public static final AttributeSensor<Integer> TEST_METRIC = Sensors.newIntegerSensor( + "test.metric", "Dummy workrate for test entities"); + + @SuppressWarnings("serial") + public static final AttributeSensor<Map<Entity, Double>> ITEM_USAGE_METRIC = Sensors.newSensor( + new TypeToken<Map<Entity, Double>>() {}, "test.itemUsage.metric", "Dummy item usage for test entities"); + + public boolean isStopped(); + + public void moveNonEffector(Entity rawDestination); + + public void stop(); +}