Add JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS - Prevent more than the given number of concurrent calls to create VMs for a given JcloudsLocation instance. - Required in various clouds (sometimes, at least!) such as aws-ec2 where one gets http response 503 RequestLimitExceeded when trying to provision 18 machines concurrently.
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/4265e664 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/4265e664 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/4265e664 Branch: refs/heads/0.6.0 Commit: 4265e66496e379f1d0a367cdcd5f5bcc4dfe7a12 Parents: 2fc1caf Author: Aled Sage <[email protected]> Authored: Wed Oct 30 11:06:03 2013 +0000 Committer: Aled Sage <[email protected]> Committed: Wed Nov 6 16:12:02 2013 +0000 ---------------------------------------------------------------------- .../location/jclouds/JcloudsLocation.java | 56 ++++-- .../location/jclouds/JcloudsLocationConfig.java | 7 + .../location/jclouds/JcloudsLocationTest.java | 191 +++++++++++++++++-- 3 files changed, 224 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java ---------------------------------------------------------------------- diff --git a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java index 135cbcd..736a461 100644 --- a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java +++ b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocation.java @@ -19,6 +19,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -162,6 +164,14 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im } setCreationString(getConfigBag()); + + if (getConfig(MACHINE_CREATION_SEMAPHORE) == null) { + Integer maxConcurrent = getConfig(MAX_CONCURRENT_MACHINE_CREATIONS); + if (maxConcurrent == null || maxConcurrent < 1) { + throw new IllegalStateException(MAX_CONCURRENT_MACHINE_CREATIONS.getName() + " must be >= 1, but was "+maxConcurrent); + } + setConfig(MACHINE_CREATION_SEMAPHORE, new Semaphore(maxConcurrent, true)); + } } @Override @@ -178,7 +188,8 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im return getManagementContext().getLocationManager().createLocation(LocationSpec.create(getClass()) .parent(this) .configure(getRawLocalConfigBag().getAllConfig()) - .configure(newFlags)); + .configure(newFlags) + .configure(MACHINE_CREATION_SEMAPHORE, getMachineCreationSemaphore())); } @Override @@ -232,6 +243,10 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im USER, JCLOUDS_KEY_USERNAME); } + protected Semaphore getMachineCreationSemaphore() { + return checkNotNull(getConfig(MACHINE_CREATION_SEMAPHORE), MACHINE_CREATION_SEMAPHORE.getName()); + } + protected Collection<JcloudsLocationCustomizer> getCustomizers(ConfigBag setup) { JcloudsLocationCustomizer customizer = setup.get(JCLOUDS_LOCATION_CUSTOMIZER); Collection<JcloudsLocationCustomizer> customizers = setup.get(JCLOUDS_LOCATION_CUSTOMIZERS); @@ -386,19 +401,36 @@ public class JcloudsLocation extends AbstractCloudMachineProvisioningLocation im try { LOG.info("Creating VM in "+setup.getDescription()+" for "+this); - Template template = buildTemplate(computeService, setup); - LoginCredentials initialCredentials = initUserTemplateOptions(template, setup); - for (JcloudsLocationCustomizer customizer : getCustomizers(setup)) { - customizer.customize(this, computeService, template.getOptions()); + LoginCredentials initialCredentials; + Set<? extends NodeMetadata> nodes; + Semaphore machineCreationSemaphore = getMachineCreationSemaphore(); + boolean acquired = machineCreationSemaphore.tryAcquire(0, TimeUnit.SECONDS); + if (!acquired) { + LOG.info("Waiting in {} for machine-creation permit ({} other queuing requests already)", new Object[] {this, machineCreationSemaphore.getQueueLength()}); + Stopwatch stopwatch = new Stopwatch().start(); + machineCreationSemaphore.acquire(); + LOG.info("Acquired in {} machine-creation permit, after waiting {}", this, Time.makeTimeStringRounded(stopwatch)); + } else { + LOG.info("Acquired in {} machine-creation permit immediately", this); + } + try { + Template template = buildTemplate(computeService, setup); + initialCredentials = initUserTemplateOptions(template, setup); + for (JcloudsLocationCustomizer customizer : getCustomizers(setup)) { + customizer.customize(this, computeService, template.getOptions()); + } + LOG.debug("jclouds using template {} / options {} to provision machine in {}", new Object[] { + template, template.getOptions(), setup.getDescription()}); + + if (!setup.getUnusedConfig().isEmpty()) + LOG.debug("NOTE: unused flags passed to obtain VM in "+setup.getDescription()+": "+ + setup.getUnusedConfig()); + + nodes = computeService.createNodesInGroup(groupId, 1, template); + } finally { + machineCreationSemaphore.release(); } - LOG.debug("jclouds using template {} / options {} to provision machine in {}", new Object[] { - template, template.getOptions(), setup.getDescription()}); - - if (!setup.getUnusedConfig().isEmpty()) - LOG.debug("NOTE: unused flags passed to obtain VM in "+setup.getDescription()+": "+ - setup.getUnusedConfig()); - Set<? extends NodeMetadata> nodes = computeService.createNodesInGroup(groupId, 1, template); node = Iterables.getOnlyElement(nodes, null); LOG.debug("jclouds created {} for {}", node, setup.getDescription()); if (node == null) http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java ---------------------------------------------------------------------- diff --git a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java index c634482..b01a355 100644 --- a/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java +++ b/locations/jclouds/src/main/java/brooklyn/location/jclouds/JcloudsLocationConfig.java @@ -2,6 +2,7 @@ package brooklyn.location.jclouds; import java.io.File; import java.util.Collection; +import java.util.concurrent.Semaphore; import org.jclouds.Constants; import org.jclouds.compute.domain.TemplateBuilder; @@ -143,6 +144,12 @@ public interface JcloudsLocationConfig extends CloudLocationConfig { public static final ConfigKey<Integer> MACHINE_CREATE_ATTEMPTS = ConfigKeys.newIntegerConfigKey( "machineCreateAttempts", "Number of times to retry if jclouds fails to create a VM", 1); + public static final ConfigKey<Integer> MAX_CONCURRENT_MACHINE_CREATIONS = ConfigKeys.newIntegerConfigKey( + "maxConcurrentMachineCreations", "Maximum number of concurrent machine-creations", Integer.MAX_VALUE); + + public static final ConfigKey<Semaphore> MACHINE_CREATION_SEMAPHORE = ConfigKeys.newConfigKey( + Semaphore.class, "machineCreationSemaphore", "Semaphore for controlling concurrent machine creation", null); + // TODO // "noDefaultSshKeys" - hints that local ssh keys should not be read as defaults http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/4265e664/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java ---------------------------------------------------------------------- diff --git a/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java b/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java index b859a23..aaf9734 100644 --- a/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java +++ b/locations/jclouds/src/test/java/brooklyn/location/jclouds/JcloudsLocationTest.java @@ -1,6 +1,11 @@ package brooklyn.location.jclouds; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -13,18 +18,22 @@ import org.testng.annotations.Test; import brooklyn.config.BrooklynProperties; import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; import brooklyn.entity.basic.Entities; import brooklyn.location.LocationSpec; import brooklyn.location.NoMachinesAvailableException; import brooklyn.management.internal.LocalManagementContext; +import brooklyn.test.Asserts; import brooklyn.util.collections.MutableMap; import brooklyn.util.config.ConfigBag; import brooklyn.util.exceptions.Exceptions; +import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.reflect.TypeToken; /** * @author Shane Witbeck @@ -35,6 +44,8 @@ public class JcloudsLocationTest implements JcloudsLocationConfig { new RuntimeException("early termination for test"); public static class BailOutJcloudsLocation extends JcloudsLocation { + public static final ConfigKey<Function<ConfigBag,Void>> BUILD_TEMPLATE_INTERCEPTOR = ConfigKeys.newConfigKey(new TypeToken<Function<ConfigBag,Void>>() {}, "buildtemplateinterceptor"); + ConfigBag lastConfigBag; public BailOutJcloudsLocation() { @@ -48,9 +59,10 @@ public class JcloudsLocationTest implements JcloudsLocationConfig { @Override protected Template buildTemplate(ComputeService computeService, ConfigBag config) { lastConfigBag = config; + if (getConfig(BUILD_TEMPLATE_INTERCEPTOR) != null) getConfig(BUILD_TEMPLATE_INTERCEPTOR).apply(config); throw BAIL_OUT_FOR_TESTING; } - protected synchronized void tryObtainAndCheck(Map<?,?> flags, Predicate<ConfigBag> test) { + protected void tryObtainAndCheck(Map<?,?> flags, Predicate<? super ConfigBag> test) { try { obtain(flags); } catch (NoMachinesAvailableException e) { @@ -110,34 +122,49 @@ public class JcloudsLocationTest implements JcloudsLocationConfig { } protected BailOutJcloudsLocation newSampleBailOutJcloudsLocationForTesting() { + return newSampleBailOutJcloudsLocationForTesting(ImmutableMap.<ConfigKey<?>,Object>of()); + } + + protected BailOutJcloudsLocation newSampleBailOutJcloudsLocationForTesting(Map<?,?> config) { + Map<ConfigKey<?>,?> allConfig = MutableMap.<ConfigKey<?>,Object>builder() + .put(IMAGE_ID, "bogus") + .put(CLOUD_PROVIDER, "aws-ec2") + .put(ACCESS_IDENTITY, "bogus") + .put(CLOUD_REGION_ID, "bogus") + .put(ACCESS_CREDENTIAL, "bogus") + .put(USER, "fred") + .put(MIN_RAM, 16) + .putAll((Map)config) + .build(); return managementContext.getLocationManager().createLocation(LocationSpec.create(BailOutJcloudsLocation.class) - .configure(MutableMap.of( - IMAGE_ID, "bogus", - CLOUD_PROVIDER, "aws-ec2", - ACCESS_IDENTITY, "bogus", - CLOUD_REGION_ID, "bogus", - ACCESS_CREDENTIAL, "bogus", - USER, "fred", - MIN_RAM, 16))); + .configure(allConfig)); } protected BailOutWithTemplateJcloudsLocation newSampleBailOutWithTemplateJcloudsLocation() { + return newSampleBailOutWithTemplateJcloudsLocation(ImmutableMap.<ConfigKey<?>,Object>of()); + } + + protected BailOutWithTemplateJcloudsLocation newSampleBailOutWithTemplateJcloudsLocation(Map<?,?> config) { String identity = (String) brooklynProperties.get("brooklyn.location.jclouds.aws-ec2.identity"); if (identity == null) identity = (String) brooklynProperties.get("brooklyn.jclouds.aws-ec2.identity"); String credential = (String) brooklynProperties.get("brooklyn.location.jclouds.aws-ec2.credential"); if (credential == null) identity = (String) brooklynProperties.get("brooklyn.jclouds.aws-ec2.credential"); + Map<ConfigKey<?>,?> allConfig = MutableMap.<ConfigKey<?>,Object>builder() + .put(CLOUD_PROVIDER, "aws-ec2") + .put(CLOUD_REGION_ID, "eu-west-1") + .put(IMAGE_ID, "us-east-1/ami-7d7bfc14") // so it runs faster, without loading all EC2 images + .put(ACCESS_IDENTITY, identity) + .put(ACCESS_CREDENTIAL, credential) + .put(USER, "fred") + .put(INBOUND_PORTS, "[22, 80, 9999]") + .putAll((Map)config) + .build(); + return managementContext.getLocationManager().createLocation(LocationSpec.create(BailOutWithTemplateJcloudsLocation.class) - .configure(MutableMap.of( - CLOUD_PROVIDER, "aws-ec2", - CLOUD_REGION_ID, "eu-west-1", - IMAGE_ID, "us-east-1/ami-7d7bfc14", // so it runs faster, without loading all EC2 images - ACCESS_IDENTITY, identity, - ACCESS_CREDENTIAL, credential, - USER, "fred", - INBOUND_PORTS, "[22, 80, 9999]"))); + .configure(allConfig)); } - + public static Predicate<ConfigBag> checkerFor(final String user, final Integer minRam, final Integer minCores) { return new Predicate<ConfigBag>() { @Override @@ -266,5 +293,133 @@ public class JcloudsLocationTest implements JcloudsLocationConfig { Assert.assertEquals(jcloudsLocation.template.getOptions().getInboundPorts(), ports); } + @Test + public void testCreateWithMaxConcurrentCallsUnboundedByDefault() throws Exception { + final int numCalls = 20; + ConcurrencyTracker interceptor = new ConcurrencyTracker(); + ExecutorService executor = Executors.newCachedThreadPool(); + + try { + final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of(BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor)); + + for (int i = 0; i < numCalls; i++) { + executor.execute(new Runnable() { + @Override public void run() { + jcloudsLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue()); + }}); + } + + interceptor.assertCallCountEventually(numCalls); + + interceptor.unblock(); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + + } finally { + executor.shutdownNow(); + } + } + + @Test(groups="Integration") // because takes 1 sec + public void testCreateWithMaxConcurrentCallsRespectsConfig() throws Exception { + final int numCalls = 4; + final int maxConcurrentCreations = 2; + ConcurrencyTracker interceptor = new ConcurrencyTracker(); + ExecutorService executor = Executors.newCachedThreadPool(); + + try { + final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of( + BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor, + JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, maxConcurrentCreations)); + + for (int i = 0; i < numCalls; i++) { + executor.execute(new Runnable() { + @Override public void run() { + jcloudsLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue()); + }}); + } + + interceptor.assertCallCountEventually(maxConcurrentCreations); + interceptor.assertCallCountContinually(maxConcurrentCreations); + + interceptor.unblock(); + interceptor.assertCallCountEventually(numCalls); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + + } finally { + executor.shutdownNow(); + } + } + + @Test(groups="Integration") // because takes 1 sec + public void testCreateWithMaxConcurrentCallsAppliesToSubLocations() throws Exception { + final int numCalls = 4; + final int maxConcurrentCreations = 2; + ConcurrencyTracker interceptor = new ConcurrencyTracker(); + ExecutorService executor = Executors.newCachedThreadPool(); + + try { + final BailOutJcloudsLocation jcloudsLocation = newSampleBailOutJcloudsLocationForTesting(ImmutableMap.of( + BailOutJcloudsLocation.BUILD_TEMPLATE_INTERCEPTOR, interceptor, + JcloudsLocation.MAX_CONCURRENT_MACHINE_CREATIONS, maxConcurrentCreations)); + + + for (int i = 0; i < numCalls; i++) { + final BailOutJcloudsLocation subLocation = (BailOutJcloudsLocation) jcloudsLocation.newSubLocation(MutableMap.of()); + executor.execute(new Runnable() { + @Override public void run() { + subLocation.tryObtainAndCheck(MutableMap.of(), Predicates.alwaysTrue()); + }}); + } + + interceptor.assertCallCountEventually(maxConcurrentCreations); + interceptor.assertCallCountContinually(maxConcurrentCreations); + + interceptor.unblock(); + interceptor.assertCallCountEventually(numCalls); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + + } finally { + executor.shutdownNow(); + } + } + + public static class ConcurrencyTracker implements Function<ConfigBag,Void> { + final AtomicInteger concurrentCallsCounter = new AtomicInteger(); + final CountDownLatch continuationLatch = new CountDownLatch(1); + + @Override public Void apply(ConfigBag input) { + concurrentCallsCounter.incrementAndGet(); + try { + continuationLatch.await(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + return null; + } + + public void unblock() { + continuationLatch.countDown(); + } + + public void assertCallCountEventually(final int expected) { + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + Assert.assertEquals(concurrentCallsCounter.get(), expected); + } + }); + } + + public void assertCallCountContinually(final int expected) { + Asserts.succeedsContinually(new Runnable() { + @Override public void run() { + Assert.assertEquals(concurrentCallsCounter.get(), expected); + } + }); + } + } + // TODO more tests, where flags come in from resolver, named locations, etc }
