Fixed bucket creation
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7d0467d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7d0467d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7d0467d8 Branch: refs/heads/master Commit: 7d0467d887629225b919d9fce93d5abf3c9a0b20 Parents: 4f70aa1 Author: Martin Harris <[email protected]> Authored: Wed Jun 25 16:44:11 2014 +0100 Committer: Martin Harris <[email protected]> Committed: Mon Aug 18 15:45:43 2014 +0100 ---------------------------------------------------------------------- .../nosql/couchbase/CouchbaseClusterImpl.java | 47 +++++++- .../nosql/couchbase/CouchbaseNodeSshDriver.java | 116 +++++++++++++++++-- .../couchbase/CouchbaseSyncGatewayImpl.java | 5 + .../CouchbaseSyncGatewaySshDriver.java | 34 ++++-- 4 files changed, 175 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7d0467d8/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java index b7c2026..f424eaf 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java @@ -48,23 +48,27 @@ import brooklyn.event.basic.DependentConfiguration; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; import brooklyn.location.Location; import brooklyn.policy.PolicySpec; import brooklyn.util.collections.MutableSet; +import brooklyn.util.guava.Functionals; import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.TaskBuilder; import brooklyn.util.task.Tasks; import brooklyn.util.text.ByteSizeStrings; +import brooklyn.util.text.Strings; import brooklyn.util.time.Time; import com.google.common.base.Function; -import com.google.common.base.Functions; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster { private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class); @@ -141,6 +145,8 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas connectSensors(); connectEnrichers(); + + setAttribute(BUCKET_CREATION_IN_PROGRESS, false); //start timeout before adding the servers Time.sleep(getConfig(SERVICE_UP_TIME_OUT)); @@ -167,10 +173,12 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas } finally { Tasks.resetBlockingDetails(); } - Entities.invokeEffector(this, getPrimaryNode(), CouchbaseNode.REBALANCE); + + ((CouchbaseNode)getPrimaryNode()).rebalance(); if (Optional.fromNullable(CREATE_BUCKETS).isPresent()) { createBuckets(); + DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); } setAttribute(IS_CLUSTER_INITIALIZED, true); @@ -187,6 +195,9 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas @Override public void stop() { + if (resetBucketCreation[0] != null) { + resetBucketCreation[0].stop(); + } super.stop(); } @@ -205,7 +216,7 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas .displayName("Controller targets tracker") .configure("group", this)); } - + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { @Override protected void onEntityChange(Entity member) { ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member); @@ -377,17 +388,41 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas CouchbaseClusterImpl.this.resetBucketCreation[0].stop(); } setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true); - + CouchbaseClusterImpl.this.resetBucketCreation[0] = HttpFeed.builder() .entity(CouchbaseClusterImpl.this) .period(500, TimeUnit.MILLISECONDS) .baseUri(String.format("%s/pools/default/buckets/%s", primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL), bucketName)) .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) .poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS) - .onSuccess(HttpValueFunctions.responseCodeEquals(404)) - .onFailureOrException(Functions.constant(false))) + .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function<JsonElement, Boolean>() { + @Override public Boolean apply(JsonElement input) { + // Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable) + JsonArray servers = input.getAsJsonArray(); + if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) { + return true; + } + for (JsonElement server : servers) { + Object api = server.getAsJsonObject().get("couchApiBase"); + if (api == null || Strings.isEmpty(String.valueOf(api))) { + return true; + } + } + return false; + } + })) + .onFailureOrException(new Function<Object, Boolean>() { + @Override + public Boolean apply(Object input) { + if (((brooklyn.util.http.HttpToolResponse) input).getResponseCode() == 404) { + return true; + } + throw new IllegalStateException("Unexpected response when creating bucket:" + input); + } + })) .build(); + // TODO: Bail out if bucket creation fails, to allow next bucket to proceed Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7d0467d8/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java index e02ddc9..6f81b73 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java @@ -24,19 +24,38 @@ import static brooklyn.util.ssh.BashCommands.chainGroup; import static brooklyn.util.ssh.BashCommands.sudo; import static java.lang.String.format; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.http.auth.Credentials; +import org.apache.http.auth.UsernamePasswordCredentials; import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; import brooklyn.entity.basic.Entities; import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; import brooklyn.location.OsDetails; import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.guava.Functionals; +import brooklyn.util.http.HttpTool; +import brooklyn.util.http.HttpToolResponse; +import brooklyn.util.repeat.Repeater; import brooklyn.util.ssh.BashCommands; import brooklyn.util.task.Tasks; import brooklyn.util.time.Duration; import brooklyn.util.time.Time; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver { @@ -193,8 +212,83 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp .failOnNonZeroResultCode() .execute(); entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "Rebalance Started"); + // wait until the re-balance is complete + Repeater.create() + .every(Duration.millis(500)) + .limitTimeTo(Duration.THIRTY_SECONDS) + .until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + for (String nodeHostName : CouchbaseNodeSshDriver.this.getNodeHostNames()) { + if (isNodeRebalancing(nodeHostName)) { + return true; + } + } + return false; + } + }) + .run(); + Repeater.create() + .every(Duration.FIVE_SECONDS) + .limitTimeTo(Duration.FIVE_MINUTES) + .until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + for (String nodeHostName : CouchbaseNodeSshDriver.this.getNodeHostNames()) { + if (isNodeRebalancing(nodeHostName)) { + return false; + } + } + return true; + } + }) + .run(); + log.info("rebalanced cluster via primary node {}", getEntity()); } + private Iterable<String> getNodeHostNames() throws URISyntaxException { + Function<JsonElement, Iterable<String>> getNodesAsList = new Function<JsonElement, Iterable<String>>() { + @Override public Iterable<String> apply(JsonElement input) { + if (input == null) { + return Collections.emptyList(); + } + Collection<String> names = Lists.newArrayList(); + JsonArray nodes = input.getAsJsonArray(); + for (JsonElement element : nodes) { + // NOTE: the 'hostname' element also includes the port + names.add(element.getAsJsonObject().get("hostname").toString().replace("\"", "")); + } + return names; + } + }; + HttpToolResponse nodesResponse = getAPIResponse(String.format("http://%s:%s/pools/nodes", getHostname(), getWebPort())); + return Functionals.chain( + HttpValueFunctions.jsonContents(), + JsonFunctions.walkN("nodes"), + getNodesAsList + ).apply(nodesResponse); + } + + private boolean isNodeRebalancing(String nodeHostName) throws URISyntaxException { + HttpToolResponse response = getAPIResponse("http://" + nodeHostName + "/pools/nodes/rebalanceProgress"); + if (response.getResponseCode() != 200) { + throw new IllegalStateException("failed to rebalance cluster: " + response); + } + return !HttpValueFunctions.jsonContents("status", String.class).apply(response).equals("none"); + } + + private HttpToolResponse getAPIResponse(String path) throws URISyntaxException { + URI uri = new URI(path); + Credentials credentials = new UsernamePasswordCredentials(getUsername(), getPassword()); + return HttpTool.httpGet(HttpTool.httpClientBuilder() + // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials + .uri(uri) + .credentials(credentials) + .build(), + uri, + ImmutableMap.<String, String>of()); + } + @Override public void serverAdd(String serverToAdd, String username, String password) { newScript("serverAdd").body.append(couchbaseCli("server-add") @@ -219,15 +313,15 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp } @Override - public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { - newScript("bucketCreate").body.append(couchbaseCli("bucket-create") - + getCouchbaseHostnameAndCredentials() + - " --bucket=" + bucketName + - " --bucket-type=" + bucketType + - " --bucket-port=" + bucketPort + - " --bucket-ramsize=" + bucketRamSize + - " --bucket-replica=" + bucketReplica) - .failOnNonZeroResultCode() - .execute(); - } + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { + newScript("bucketCreate").body.append(couchbaseCli("bucket-create") + + getCouchbaseHostnameAndCredentials() + + " --bucket=" + bucketName + + " --bucket-type=" + bucketType + + " --bucket-port=" + bucketPort + + " --bucket-ramsize=" + bucketRamSize + + " --bucket-replica=" + bucketReplica) + .failOnNonZeroResultCode() + .execute(); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7d0467d8/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java index 09ef569..0ead110 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java @@ -1,6 +1,7 @@ package brooklyn.entity.nosql.couchbase; +import brooklyn.config.render.RendererHints; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; @@ -58,4 +59,8 @@ public class CouchbaseSyncGatewayImpl extends SoftwareProcessImpl implements Cou httpFeed.stop(); } } + + static { + RendererHints.register(MANAGEMENT_URL, new RendererHints.NamedActionWithUrl("Open")); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7d0467d8/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java index c7c9129..7339c40 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java @@ -12,18 +12,21 @@ import javax.annotation.Nullable; import brooklyn.entity.Entity; import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; -import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityLocal; import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.event.basic.DependentConfiguration; import brooklyn.location.OsDetails; import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; import brooklyn.util.ssh.BashCommands; import brooklyn.util.time.Duration; import com.google.common.base.Optional; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseSyncGatewayDriver { @@ -32,11 +35,6 @@ public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDri } @Override - public boolean isRunning() { - return Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP)); - } - - @Override public void stop() { } @@ -67,8 +65,13 @@ public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDri public void launch() { Entity cbNode = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER); Entities.waitForServiceUp(cbNode, Duration.ONE_HOUR); - - + DependentConfiguration.waitInTaskForAttributeReady(cbNode, CouchbaseCluster.IS_CLUSTER_INITIALIZED, Predicates.equalTo(true)); + try { + // Even once the bucket has published its API URL, it can still take a couple of seconds for it to become available + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + // no-op + } if (cbNode instanceof CouchbaseCluster) { Optional<Entity> cbClusterNode = Iterables.tryFind(cbNode.getAttribute(CouchbaseCluster.GROUP_MEMBERS), new Predicate<Entity>() { @@ -106,10 +109,21 @@ public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDri String options = format("-url %s -bucket %s -adminInterface 0.0.0.0:%s -interface 0.0.0.0:%s -pool %s %s %s", serverWebAdminUrl, bucketName, adminRestApiPort, syncRestApiPort, pool, pretty, verbose); - newScript(LAUNCHING) - .body.append(format("/opt/couchbase-sync-gateway/bin/sync_gateway %s &", options)) + newScript(ImmutableMap.of("usePidFile", true), LAUNCHING) + .body.append(format("/opt/couchbase-sync-gateway/bin/sync_gateway %s ", options) + "> out.log 2> err.log < /dev/null &") + .failOnNonZeroResultCode() .execute(); } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of("usePidFile", true), CHECK_RUNNING).execute() == 0; + } + + @Override + public void kill() { + newScript(MutableMap.of("usePidFile", true), KILLING).execute(); + } private List<String> installLinux(List<String> urls, String saveAs) {
