Copied in @ZaidM's work from https://github.com/ZaidM/brooklyn/tree/couchbase-sync-gateway1
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4f70aa11 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4f70aa11 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4f70aa11 Branch: refs/heads/master Commit: 4f70aa1172f0900c4a77f9c5a03d3132900c0440 Parents: 21a037f Author: Martin Harris <[email protected]> Authored: Fri Jun 20 10:34:39 2014 +0100 Committer: Martin Harris <[email protected]> Committed: Mon Aug 18 15:45:43 2014 +0100 ---------------------------------------------------------------------- .../nosql/couchbase/CouchbaseCluster.java | 23 ++- .../nosql/couchbase/CouchbaseClusterImpl.java | 68 ++++++++- .../entity/nosql/couchbase/CouchbaseNode.java | 6 + .../nosql/couchbase/CouchbaseNodeDriver.java | 2 + .../nosql/couchbase/CouchbaseNodeImpl.java | 4 + .../nosql/couchbase/CouchbaseNodeSshDriver.java | 12 ++ .../nosql/couchbase/CouchbaseSyncGateway.java | 61 ++++++++ .../couchbase/CouchbaseSyncGatewayDriver.java | 9 ++ .../couchbase/CouchbaseSyncGatewayImpl.java | 61 ++++++++ .../CouchbaseSyncGatewaySshDriver.java | 147 +++++++++++++++++++ 10 files changed, 390 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java index b44f47a..c478b07 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java @@ -19,10 +19,9 @@ package brooklyn.entity.nosql.couchbase; import java.util.List; +import java.util.Map; import java.util.Set; -import com.google.common.reflect.TypeToken; - import brooklyn.config.ConfigKey; import brooklyn.entity.Entity; import brooklyn.entity.basic.ConfigKeys; @@ -33,6 +32,8 @@ import brooklyn.event.basic.Sensors; import brooklyn.util.flags.SetFromFlag; import brooklyn.util.time.Duration; +import com.google.common.reflect.TypeToken; + @ImplementedBy(CouchbaseClusterImpl.class) public interface CouchbaseCluster extends DynamicCluster { @@ -90,4 +91,22 @@ public interface CouchbaseCluster extends DynamicCluster { "Average across cluster for pools/nodes/<current node>/interestingStats/couch_docs_actual_disk_size"); AttributeSensor<Long> COUCH_VIEWS_DATA_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.data.size", "Average across cluster for pools/nodes/<current node>/interestingStats/couch_views_data_size"); + + AttributeSensor<Boolean> BUCKET_CREATION_IN_PROGRESS = Sensors.newBooleanSensor("couchbase.cluster.bucketCreationInProgress", "Indicates that a bucket is currently being created, and" + + "further bucket creation should be deferred"); + + /** + * createBuckets is a list of all the buckets to be created on the couchbase cluster + * the buckets will be created on the primary node of the cluster + * each map entry for a bucket should contain the following parameters: + * - <"bucket",(String) name of the bucket (default: default)> + * - <"bucket-type",(String) name of bucket type (default: couchbase)> + * - <"bucket-port",(Integer) the bucket port to connect to (default: 11222)> + * - <"bucket-ramsize",(Integer) ram size allowed for bucket (default: 200)> + * - <"bucket-replica",(Integer) number of replicas for the bucket (default: 1)> + */ + @SuppressWarnings("serial") + @SetFromFlag("createBuckets") + ConfigKey<List<Map<String, Object>>> CREATE_BUCKETS = ConfigKeys.newConfigKey(new TypeToken<List<Map<String, Object>>>() {}, + "couchbase.cluster.createBuckets", "a list of all dedicated port buckets to be created on the couchbase cluster"); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java index 11c56d7..b7c2026 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,16 +44,24 @@ import brooklyn.entity.trait.Startable; import brooklyn.event.AttributeSensor; import brooklyn.event.SensorEvent; import brooklyn.event.SensorEventListener; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.event.feed.http.HttpFeed; +import brooklyn.event.feed.http.HttpPollConfig; +import brooklyn.event.feed.http.HttpValueFunctions; import brooklyn.location.Location; import brooklyn.policy.PolicySpec; import brooklyn.util.collections.MutableSet; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.task.TaskBuilder; import brooklyn.util.task.Tasks; import brooklyn.util.text.ByteSizeStrings; import brooklyn.util.time.Time; import com.google.common.base.Function; +import com.google.common.base.Functions; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -59,6 +69,7 @@ import com.google.common.collect.Sets; public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster { private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class); private final Object mutex = new Object[0]; + private final HttpFeed[] resetBucketCreation = new HttpFeed[]{null}; public void init() { log.info("Initializing the Couchbase cluster..."); @@ -146,7 +157,7 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas serversToAdd.remove(getPrimaryNode()); if (getUpNodes().size() >= getQuorumSize() && getUpNodes().size() > 1) { - log.info("number of SERVICE_UP nodes:{} in cluster:{} did reached Quorum:{}, adding the servers", new Object[]{getUpNodes().size(), getId(), getQuorumSize()}); + log.info("number of SERVICE_UP nodes:{} in cluster:{} reached Quorum:{}, adding the servers", new Object[]{getUpNodes().size(), getId(), getQuorumSize()}); addServers(serversToAdd); //wait for servers to be added to the couchbase server @@ -157,6 +168,10 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas Tasks.resetBlockingDetails(); } Entities.invokeEffector(this, getPrimaryNode(), CouchbaseNode.REBALANCE); + + if (Optional.fromNullable(CREATE_BUCKETS).isPresent()) { + createBuckets(); + } setAttribute(IS_CLUSTER_INITIALIZED, true); } else { @@ -333,6 +348,57 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements Couchbas return Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false); } + public void createBuckets() { + //FIXME: multiple buckets require synchronization/wait time (checks for port conflicts and exceeding ram size) + //TODO: check for multiple bucket conflicts with port + List<Map<String, Object>> bucketsToCreate = getConfig(CREATE_BUCKETS); + Entity primaryNode = getPrimaryNode(); + + for (Map<String, Object> bucketMap : bucketsToCreate) { + String bucketName = bucketMap.containsKey("bucket") ? (String) bucketMap.get("bucket") : "default"; + String bucketType = bucketMap.containsKey("bucket-type") ? (String) bucketMap.get("bucket-type") : "couchbase"; + Integer bucketPort = bucketMap.containsKey("bucket-port") ? (Integer) bucketMap.get("bucket-port") : 11222; + Integer bucketRamSize = bucketMap.containsKey("bucket-ramsize") ? (Integer) bucketMap.get("bucket-ramsize") : 200; + Integer bucketReplica = bucketMap.containsKey("bucket-replica") ? (Integer) bucketMap.get("bucket-replica") : 1; + + log.info("adding bucket: {} to primary node: {}", bucketName, primaryNode.getId()); + createBucket(primaryNode, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + //TODO: add if bucket has been created. + } + } + + public void createBucket(final Entity primaryNode, final String bucketName, final String bucketType, final Integer bucketPort, final Integer bucketRamSize, final Integer bucketReplica) { + DynamicTasks.queueIfPossible(TaskBuilder.<Void>builder().name("Creating bucket " + bucketName).body( + new Callable<Void>() { + @Override + public Void call() throws Exception { + DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); + if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) { + CouchbaseClusterImpl.this.resetBucketCreation[0].stop(); + } + setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true); + + CouchbaseClusterImpl.this.resetBucketCreation[0] = HttpFeed.builder() + .entity(CouchbaseClusterImpl.this) + .period(500, TimeUnit.MILLISECONDS) + .baseUri(String.format("%s/pools/default/buckets/%s", primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL), bucketName)) + .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) + .poll(new HttpPollConfig<Boolean>(BUCKET_CREATION_IN_PROGRESS) + .onSuccess(HttpValueFunctions.responseCodeEquals(404)) + .onFailureOrException(Functions.constant(false))) + .build(); + + Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); + if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) { + CouchbaseClusterImpl.this.resetBucketCreation[0].stop(); + } + return null; + } + } + ).build()).orSubmitAndBlock(); + } + static { RendererHints.register(COUCH_DOCS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java index a27045f..dd46eba 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java @@ -122,6 +122,7 @@ public interface CouchbaseNode extends SoftwareProcess { MethodEffector<Void> SERVER_ADD = new MethodEffector<Void>(CouchbaseNode.class, "serverAdd"); MethodEffector<Void> SERVER_ADD_AND_REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "serverAddAndRebalance"); MethodEffector<Void> REBALANCE = new MethodEffector<Void>(CouchbaseNode.class, "rebalance"); + MethodEffector<Void> BUCKET_CREATE = new MethodEffector<Void>(CouchbaseNode.class, "bucketCreate"); @Effector(description = "add a server to a cluster") public void serverAdd(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password); @@ -131,5 +132,10 @@ public interface CouchbaseNode extends SoftwareProcess { @Effector(description = "rebalance the couchbase cluster") public void rebalance(); + + @Effector(description = "create a new bucket") + public void bucketCreate(@EffectorParam(name = "bucketName") String bucketName, @EffectorParam(name = "bucketType") String bucketType, + @EffectorParam(name = "bucketPort") Integer bucketPort, @EffectorParam(name = "bucketRamSize") Integer bucketRamSize, + @EffectorParam(name = "bucketReplica") Integer bucketReplica); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java index 9fc5877..b77f244 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java @@ -26,6 +26,8 @@ public interface CouchbaseNodeDriver extends SoftwareProcessDriver { public void serverAdd(String serverToAdd, String username, String password); public void rebalance(); + + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica); public void serverAddAndRebalance(String serverToAdd, String username, String password); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java index e556c5d..11e36bc 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java @@ -181,4 +181,8 @@ public class CouchbaseNodeImpl extends SoftwareProcessImpl implements CouchbaseN } } + @Override + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { + getDriver().bucketCreate(bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java index d55f7df..e02ddc9 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java @@ -218,4 +218,16 @@ public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver imp entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "Rebalance Started"); } + @Override + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { + newScript("bucketCreate").body.append(couchbaseCli("bucket-create") + + getCouchbaseHostnameAndCredentials() + + " --bucket=" + bucketName + + " --bucket-type=" + bucketType + + " --bucket-port=" + bucketPort + + " --bucket-ramsize=" + bucketRamSize + + " --bucket-replica=" + bucketReplica) + .failOnNonZeroResultCode() + .execute(); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java new file mode 100644 index 0000000..ddea721 --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java @@ -0,0 +1,61 @@ +package brooklyn.entity.nosql.couchbase; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +@ImplementedBy(CouchbaseSyncGatewayImpl.class) +public interface CouchbaseSyncGateway extends SoftwareProcess { + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, + "1.0-beta3.1"); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "http://packages.couchbase.com/releases/couchbase-sync-gateway/1.0-beta/couchbase-sync-gateway-community_${version}_${driver.osTag}"); + + @SetFromFlag("couchbaseServer") + ConfigKey<Entity> COUCHBASE_SERVER = ConfigKeys.newConfigKey(Entity.class, "couchbaseSyncGateway.couchbaseNode", + "Couchbase server node or cluster the sync gateway connects to"); + + @SetFromFlag("serverPool") + ConfigKey<String> COUCHBASE_SERVER_POOL = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverPool", + "Couchbase Server pool name in which to find buckets", "default"); + + @SetFromFlag("couchbaseServerBucket") + ConfigKey<String> COUCHBASE_SERVER_BUCKET = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverBucket", + "Name of the Couchbase bucket to use", "sync_gateway"); + + @SetFromFlag("couchbaseServerUrl") + ConfigKey<String> COUCHBASE_SERVER_URL = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.couchbaseServerUrl", + "Couchbase Server Admin Url to connect the gateway to"); + + @SetFromFlag("pretty") + ConfigKey<Boolean> PRETTY = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.pretty", + "Pretty-print JSON responses. This is useful for debugging, but reduces performance.", false); + + @SetFromFlag("verbose") + ConfigKey<Boolean> VERBOSE = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.verbose", + "Logs more information about requests.", false); + + AttributeSensor<String> COUCHBASE_SERVER_WEB_URL = Sensors.newStringSensor("couchbaseSyncGateway.serverWebUrl", + "The Url and web port of the couchbase server to connect to"); + + AttributeSensor<String> MANAGEMENT_URL = Sensors.newStringSensor("coucbaseSyncGateway.managementUrl", + "Management URL for Couchbase Sycn Gateway"); + + PortAttributeSensorAndConfigKey SYNC_REST_API_PORT = new PortAttributeSensorAndConfigKey("couchbaseSyncGateway.syncRestPort", + "Port the Sync REST API listens on", "4984"); + + PortAttributeSensorAndConfigKey ADMIN_REST_API_PORT = new PortAttributeSensorAndConfigKey("couchbaseSyncGateway.adminRestPort", + "Port the Admin REST API listens on", "4985"); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java new file mode 100644 index 0000000..b1b4339 --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java @@ -0,0 +1,9 @@ +package brooklyn.entity.nosql.couchbase; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface CouchbaseSyncGatewayDriver extends SoftwareProcessDriver { + + public String getOsTag(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java new file mode 100644 index 0000000..09ef569 --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java @@ -0,0 +1,61 @@ +package brooklyn.entity.nosql.couchbase; + + +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.event.feed.http.HttpFeed; +import brooklyn.event.feed.http.HttpPollConfig; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.location.access.BrooklynAccessUtils; + +import com.google.common.net.HostAndPort; + +public class CouchbaseSyncGatewayImpl extends SoftwareProcessImpl implements CouchbaseSyncGateway { + + private HttpFeed httpFeed; + + @Override + public Class<CouchbaseSyncGatewayDriver> getDriverInterface() { + return CouchbaseSyncGatewayDriver.class; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + connectServiceUpIsRunning(); + } + + @Override + protected void connectServiceUpIsRunning() { + + + HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, + getAttribute(CouchbaseSyncGateway.ADMIN_REST_API_PORT)); + + String managementUri = String.format("http://%s:%s", + hp.getHostText(), hp.getPort()); + + setAttribute(MANAGEMENT_URL, managementUri); + + httpFeed = HttpFeed.builder() + .entity(this) + .period(200) + .baseUri(managementUri) + .poll(new HttpPollConfig<Boolean>(SERVICE_UP) + .onSuccess(HttpValueFunctions.responseCodeEquals(200))) + .build(); + + } + + @Override + protected void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + } + + @Override + protected void disconnectServiceUpIsRunning() { + if (httpFeed != null) { + httpFeed.stop(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4f70aa11/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java new file mode 100644 index 0000000..c7c9129 --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java @@ -0,0 +1,147 @@ +package brooklyn.entity.nosql.couchbase; + +import static brooklyn.util.ssh.BashCommands.INSTALL_CURL; +import static brooklyn.util.ssh.BashCommands.alternatives; +import static brooklyn.util.ssh.BashCommands.chainGroup; +import static brooklyn.util.ssh.BashCommands.sudo; +import static java.lang.String.format; + +import java.util.List; + +import javax.annotation.Nullable; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.location.OsDetails; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.time.Duration; + +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseSyncGatewayDriver { + public CouchbaseSyncGatewaySshDriver(EntityLocal entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public boolean isRunning() { + return Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP)); + } + + @Override + public void stop() { + + } + + @Override + public void install() { + //reference http://docs.couchbase.com/sync-gateway/#getting-started-with-sync-gateway + DownloadResolver resolver = Entities.newDownloader(this); + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); + + log.info("Installing couchbase-sync-gateway version: {}", getVersion()); + if (osDetails.isLinux()) { + List<String> commands = installLinux(urls, saveAs); + newScript(INSTALLING) + .body.append(commands).execute(); + } + } + + @Override + public void customize() { + + } + + @Override + public void launch() { + Entity cbNode = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER); + Entities.waitForServiceUp(cbNode, Duration.ONE_HOUR); + + + if (cbNode instanceof CouchbaseCluster) { + Optional<Entity> cbClusterNode = Iterables.tryFind(cbNode.getAttribute(CouchbaseCluster.GROUP_MEMBERS), new Predicate<Entity>() { + + @Override + public boolean apply(@Nullable Entity entity) { + if (entity instanceof CouchbaseNode && Boolean.TRUE.equals(entity.getAttribute(CouchbaseNode.IS_IN_CLUSTER))) { + return true; + } + return false; + } + }); + if (cbClusterNode.isPresent()) { + cbNode = cbClusterNode.get(); + } else { + throw new IllegalArgumentException(format("The cluster %s does not contain any suitable Couchbase nodes to connect to..", cbNode.getId())); + } + + } + String hostname = cbNode.getAttribute(CouchbaseNode.HOSTNAME); + String webPort = cbNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT).toString(); + + + String username = cbNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); + String password = cbNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); + + String bucketName = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER_BUCKET); + String pool = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER_POOL); + String pretty = entity.getConfig(CouchbaseSyncGateway.PRETTY) ? "-pretty" : ""; + String verbose = entity.getConfig(CouchbaseSyncGateway.VERBOSE) ? "-verbose" : ""; + + String adminRestApiPort = entity.getConfig(CouchbaseSyncGateway.ADMIN_REST_API_PORT).iterator().next().toString(); + String syncRestApiPort = entity.getConfig(CouchbaseSyncGateway.SYNC_REST_API_PORT).iterator().next().toString(); + + String serverWebAdminUrl = format("http://%s:%s@%s:%s", username, password, hostname, webPort); + String options = format("-url %s -bucket %s -adminInterface 0.0.0.0:%s -interface 0.0.0.0:%s -pool %s %s %s", + serverWebAdminUrl, bucketName, adminRestApiPort, syncRestApiPort, pool, pretty, verbose); + + newScript(LAUNCHING) + .body.append(format("/opt/couchbase-sync-gateway/bin/sync_gateway %s &", options)) + .execute(); + } + + private List<String> installLinux(List<String> urls, String saveAs) { + + String apt = chainGroup( + "which apt-get", + sudo("apt-get update"), + sudo(format("dpkg -i %s", saveAs))); + + String yum = chainGroup( + "which yum", + sudo(format("rpm --install %s", saveAs))); + + return ImmutableList.<String>builder() + .add(INSTALL_CURL) + .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) + .add(alternatives(apt, yum)) + .build(); + } + + @Override + public String getOsTag() { + OsDetails os = getLocation().getOsDetails(); + if (os == null) { + // Default to generic linux + return "x86_64.rpm"; + } else { + //FIXME should be a better way to check for OS name and version + String osName = os.getName().toLowerCase(); + String fileExtension = osName.contains("deb") || osName.contains("ubuntu") ? ".deb" : ".rpm"; + String arch = os.is64bit() ? "x86_64" : "x86"; + return arch + fileExtension; + } + } + +} \ No newline at end of file
