http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBClientSupport.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBClientSupport.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBClientSupport.java new file mode 100644 index 0000000..d9997aa --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBClientSupport.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +import java.net.UnknownHostException; + +import org.bson.BSONObject; +import org.bson.BasicBSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.net.HostAndPort; +import com.mongodb.BasicDBObject; +import com.mongodb.CommandResult; +import com.mongodb.DB; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoException; +import com.mongodb.ServerAddress; + +import brooklyn.location.access.BrooklynAccessUtils; +import brooklyn.util.BrooklynNetworkUtils; + +/** + * Manages connections to standalone MongoDB servers. + * + * @see <a href="http://docs.mongodb.org/manual/reference/command/">MongoDB database command documentation</a> + */ +public class MongoDBClientSupport { + + private static final Logger LOG = LoggerFactory.getLogger(MongoDBClientSupport.class); + + private ServerAddress address; + + private MongoClient client() { + return new MongoClient(address, connectionOptions); + } + + // Set client to automatically reconnect to servers. + private static final MongoClientOptions connectionOptions = MongoClientOptions.builder() + .autoConnectRetry(true) + .socketKeepAlive(true) + .build(); + + private static final BasicBSONObject EMPTY_RESPONSE = new BasicBSONObject(); + + public MongoDBClientSupport(ServerAddress standalone) { + // We could also use a MongoClient to access an entire replica set. See MongoClient(List<ServerAddress>). + address = standalone; + } + + /** + * Creates a {@link MongoDBClientSupport} instance in standalone mode. + */ + public static MongoDBClientSupport forServer(AbstractMongoDBServer standalone) throws UnknownHostException { + HostAndPort hostAndPort = BrooklynAccessUtils.getBrooklynAccessibleAddress(standalone, standalone.getAttribute(MongoDBServer.PORT)); + ServerAddress address = new ServerAddress(hostAndPort.getHostText(), hostAndPort.getPort()); + return new MongoDBClientSupport(address); + } + + private ServerAddress getServerAddress() { + MongoClient client = client(); + try { + return client.getServerAddressList().get(0); + } finally { + client.close(); + } + } + + private HostAndPort getServerHostAndPort() { + ServerAddress address = getServerAddress(); + return HostAndPort.fromParts(address.getHost(), address.getPort()); + } + + public Optional<CommandResult> runDBCommand(String database, String command) { + return runDBCommand(database, new BasicDBObject(command, Boolean.TRUE)); + } + + private Optional<CommandResult> runDBCommand(String database, DBObject command) { + MongoClient client = client(); + try { + DB db = client.getDB(database); + CommandResult status; + try { + status = db.command(command); + } catch (MongoException e) { + LOG.warn("Command " + command + " on " + getServerAddress() + " failed", e); + return Optional.absent(); + } + if (!status.ok()) { + LOG.debug("Unexpected result of {} on {}: {}", + new Object[] { command, getServerAddress(), status.getErrorMessage() }); + } + return Optional.of(status); + } finally { + client.close(); + } + } + + public long getShardCount() { + MongoClient client = client(); + try { + return client.getDB("config").getCollection("shards").getCount(); + } finally { + client.close(); + } + } + + public BasicBSONObject getServerStatus() { + Optional<CommandResult> result = runDBCommand("admin", "serverStatus"); + if (result.isPresent() && result.get().ok()) { + return result.get(); + } else { + return EMPTY_RESPONSE; + } + } + + public boolean ping() { + DBObject ping = new BasicDBObject("ping", "1"); + try { + runDBCommand("admin", ping); + } catch (MongoException e) { + return false; + } + return true; + } + + public boolean initializeReplicaSet(String replicaSetName, Integer id) { + HostAndPort primary = getServerHostAndPort(); + BasicBSONObject config = ReplicaSetConfig.builder(replicaSetName) + .member(primary, id) + .build(); + + BasicDBObject dbObject = new BasicDBObject("replSetInitiate", config); + LOG.debug("Initiating replica set with: " + dbObject); + + Optional<CommandResult> result = runDBCommand("admin", dbObject); + if (result.isPresent() && result.get().ok() && LOG.isDebugEnabled()) { + LOG.debug("Completed initiating MongoDB replica set {} on entity {}", replicaSetName, this); + } + return result.isPresent() && result.get().ok(); + } + + /** + * Java equivalent of calling rs.conf() in the console. + */ + private BSONObject getReplicaSetConfig() { + MongoClient client = client(); + try { + return client.getDB("local").getCollection("system.replset").findOne(); + } catch (MongoException e) { + LOG.error("Failed to get replica set config on "+client, e); + return null; + } finally { + client.close(); + } + } + + /** + * Runs <code>replSetGetStatus</code> on the admin database. + * + * @return The result of <code>replSetGetStatus</code>, or + * an empty {@link BasicBSONObject} if the command threw an exception (e.g. if + * the connection was reset) or if the resultant {@link CommandResult#ok} was false. + * + * @see <a href="http://docs.mongodb.org/manual/reference/replica-status/">Replica set status reference</a> + * @see <a href="http://docs.mongodb.org/manual/reference/command/replSetGetStatus/">replSetGetStatus documentation</a> + */ + public BasicBSONObject getReplicaSetStatus() { + Optional<CommandResult> result = runDBCommand("admin", "replSetGetStatus"); + if (result.isPresent() && result.get().ok()) { + return result.get(); + } else { + return EMPTY_RESPONSE; + } + } + + /** + * Reconfigures the replica set that this client is the primary member of to include a new member. + * <p/> + * Note that this can cause long downtime (typically 10-20s, even up to a minute). + * + * @param secondary New member of the set. + * @param id The id for the new set member. Must be unique within the set. + * @return True if successful + */ + public boolean addMemberToReplicaSet(MongoDBServer secondary, Integer id) { + // We need to: + // - get the existing configuration + // - update its version + // - add the new member to its list of members + // - run replSetReconfig with the new configuration. + BSONObject existingConfig = getReplicaSetConfig(); + if (existingConfig == null) { + LOG.warn("Couldn't load existing config for replica set from {}. Server {} not added.", + getServerAddress(), secondary); + return false; + } + + BasicBSONObject newConfig = ReplicaSetConfig.fromExistingConfig(existingConfig) + .primary(getServerHostAndPort()) + .member(secondary, id) + .build(); + return reconfigureReplicaSet(newConfig); + } + + /** + * Reconfigures the replica set that this client is the primary member of to + * remove the given server. + * @param server The server to remove + * @return True if successful + */ + public boolean removeMemberFromReplicaSet(MongoDBServer server) { + BSONObject existingConfig = getReplicaSetConfig(); + if (existingConfig == null) { + LOG.warn("Couldn't load existing config for replica set from {}. Server {} not removed.", + getServerAddress(), server); + return false; + } + BasicBSONObject newConfig = ReplicaSetConfig.fromExistingConfig(existingConfig) + .primary(getServerHostAndPort()) + .remove(server) + .build(); + return reconfigureReplicaSet(newConfig); + } + + /** + * Runs replSetReconfig with the given BasicBSONObject. Returns true if the result's + * status is ok. + */ + private boolean reconfigureReplicaSet(BasicBSONObject newConfig) { + BasicDBObject command = new BasicDBObject("replSetReconfig", newConfig); + LOG.debug("Reconfiguring replica set to: " + command); + Optional<CommandResult> result = runDBCommand("admin", command); + return result.isPresent() && result.get().ok(); + } + + public boolean addShardToRouter(String hostAndPort) { + LOG.debug("Adding shard " + hostAndPort); + BasicDBObject command = new BasicDBObject("addShard", hostAndPort); + Optional<CommandResult> result = runDBCommand("admin", command); + return result.isPresent() && result.get().ok(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBDriver.java new file mode 100644 index 0000000..b7d93f0 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBDriver.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface MongoDBDriver extends SoftwareProcessDriver { +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java new file mode 100644 index 0000000..b7d91db --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +import java.util.Collection; +import java.util.List; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.group.Cluster; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +import com.google.common.reflect.TypeToken; + +/** + * A replica set of {@link MongoDBServer}s, based on {@link DynamicCluster} which can be resized by a policy + * if required. + * + * <p/><b>Note</b> + * An issue with <code>mongod</code> on Mac OS X can cause unpredictable failure of servers at start-up. + * See <a href="https://groups.google.com/forum/#!topic/mongodb-user/QRQYdIXOR2U">this mailing list post</a> + * for more information. + * + * <p/>This replica set implementation has been tested on OS X 10.6 and Ubuntu 12.04. + * + * @see <a href="http://docs.mongodb.org/manual/replication/">http://docs.mongodb.org/manual/replication/</a> + */ +@ImplementedBy(MongoDBReplicaSetImpl.class) +public interface MongoDBReplicaSet extends DynamicCluster { + + @SetFromFlag("replicaSetName") + ConfigKey<String> REPLICA_SET_NAME = ConfigKeys.newStringConfigKey( + "mongodb.replicaSet.name", "Name of the MongoDB replica set", "BrooklynCluster"); + + ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(Cluster.INITIAL_SIZE, 3); + + AttributeSensor<MongoDBServer> PRIMARY_ENTITY = Sensors.newSensor( + MongoDBServer.class, "mongodb.replicaSet.primary.entity", "The entity acting as primary"); + + @SuppressWarnings("serial") + AttributeSensor<List<String>> REPLICA_SET_ENDPOINTS = Sensors.newSensor(new TypeToken<List<String>>() {}, + "mongodb.replicaSet.endpoints", "Endpoints active for this replica set"); + + /** + * The name of the replica set. + */ + String getName(); + + /** + * @return The primary MongoDB server in the replica set. + */ + MongoDBServer getPrimary(); + + /** + * @return The secondary servers in the replica set. + */ + Collection<MongoDBServer> getSecondaries(); + + /** + * @return All servers in the replica set. + */ + Collection<MongoDBServer> getReplicas(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java new file mode 100644 index 0000000..e5ce093 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.enricher.Enrichers; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.event.AttributeSensor; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.location.Location; +import brooklyn.policy.PolicySpec; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableSet; +import brooklyn.util.text.Strings; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +/** + * Implementation of {@link MongoDBReplicaSet}. + * + * Replica sets have a <i>minimum</i> of three members. + * + * Removal strategy is always {@link #NON_PRIMARY_REMOVAL_STRATEGY}. + */ +public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDBReplicaSet { + + private static final Logger LOG = LoggerFactory.getLogger(MongoDBReplicaSetImpl.class); + + // Provides IDs for replica set members. The first member will have ID 0. + private final AtomicInteger nextMemberId = new AtomicInteger(0); + + private MemberTrackingPolicy policy; + private final AtomicBoolean mustInitialise = new AtomicBoolean(true); + + @SuppressWarnings("unchecked") + protected static final List<AttributeSensor<Long>> SENSORS_TO_SUM = Arrays.asList( + MongoDBServer.OPCOUNTERS_INSERTS, + MongoDBServer.OPCOUNTERS_QUERIES, + MongoDBServer.OPCOUNTERS_UPDATES, + MongoDBServer.OPCOUNTERS_DELETES, + MongoDBServer.OPCOUNTERS_GETMORE, + MongoDBServer.OPCOUNTERS_COMMAND, + MongoDBServer.NETWORK_BYTES_IN, + MongoDBServer.NETWORK_BYTES_OUT, + MongoDBServer.NETWORK_NUM_REQUESTS); + + public MongoDBReplicaSetImpl() { + } + + /** + * Manages member addition and removal. + * + * It's important that this is a single thread: the concurrent addition and removal + * of members from the set would almost certainly have unintended side effects, + * like reconfigurations using outdated ReplicaSetConfig instances. + */ + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + /** true iff input is a non-null MongoDBServer with attribute REPLICA_SET_MEMBER_STATUS PRIMARY. */ + static final Predicate<Entity> IS_PRIMARY = new Predicate<Entity>() { + // getPrimary relies on instanceof check + @Override public boolean apply(@Nullable Entity input) { + return input != null + && input instanceof MongoDBServer + && ReplicaSetMemberStatus.PRIMARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS)); + } + }; + + /** true iff. input is a non-null MongoDBServer with attribute REPLICA_SET_MEMBER_STATUS SECONDARY. */ + static final Predicate<Entity> IS_SECONDARY = new Predicate<Entity>() { + @Override public boolean apply(@Nullable Entity input) { + // getSecondaries relies on instanceof check + return input != null + && input instanceof MongoDBServer + && ReplicaSetMemberStatus.SECONDARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS)); + } + }; + + /** + * {@link Function} for use as the cluster's removal strategy. Chooses any entity with + * {@link MongoDBServer#IS_PRIMARY_FOR_REPLICA_SET} true last of all. + */ + private static final Function<Collection<Entity>, Entity> NON_PRIMARY_REMOVAL_STRATEGY = new Function<Collection<Entity>, Entity>() { + @Override + public Entity apply(@Nullable Collection<Entity> entities) { + checkArgument(entities != null && entities.size() > 0, "Expect list of MongoDBServers to have at least one entry"); + return Iterables.tryFind(entities, Predicates.not(IS_PRIMARY)).or(Iterables.get(entities, 0)); + } + }; + + /** @return {@link #NON_PRIMARY_REMOVAL_STRATEGY} */ + @Override + public Function<Collection<Entity>, Entity> getRemovalStrategy() { + return NON_PRIMARY_REMOVAL_STRATEGY; + } + + @Override + protected EntitySpec<?> getMemberSpec() { + return getConfig(MEMBER_SPEC, EntitySpec.create(MongoDBServer.class)); + } + + /** + * Sets {@link MongoDBServer#REPLICA_SET}. + */ + @Override + protected Map<?,?> getCustomChildFlags() { + return ImmutableMap.builder() + .putAll(super.getCustomChildFlags()) + .put(MongoDBServer.REPLICA_SET, getProxy()) + .build(); + } + + @Override + public String getName() { + // FIXME: Names must be unique if the replica sets are used in a sharded cluster + return getConfig(REPLICA_SET_NAME) + this.getId(); + } + + @Override + public MongoDBServer getPrimary() { + return Iterables.tryFind(getReplicas(), IS_PRIMARY).orNull(); + } + + @Override + public Collection<MongoDBServer> getSecondaries() { + return FluentIterable.from(getReplicas()) + .filter(IS_SECONDARY) + .toList(); + } + + @Override + public Collection<MongoDBServer> getReplicas() { + return FluentIterable.from(getMembers()) + .transform(new Function<Entity, MongoDBServer>() { + @Override public MongoDBServer apply(Entity input) { + return MongoDBServer.class.cast(input); + } + }) + .toList(); + } + + /** + * Initialises the replica set with the given server as primary if {@link #mustInitialise} is true, + * otherwise schedules the addition of a new secondary. + */ + private void serverAdded(MongoDBServer server) { + LOG.debug("Server added: {}. SERVICE_UP: {}", server, server.getAttribute(MongoDBServer.SERVICE_UP)); + + // Set the primary if the replica set hasn't been initialised. + if (mustInitialise.compareAndSet(true, false)) { + if (LOG.isInfoEnabled()) + LOG.info("First server up in {} is: {}", getName(), server); + boolean replicaSetInitialised = server.initializeReplicaSet(getName(), nextMemberId.getAndIncrement()); + if (replicaSetInitialised) { + setAttribute(PRIMARY_ENTITY, server); + setAttribute(Startable.SERVICE_UP, true); + } else { + ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); + } + } else { + if (LOG.isDebugEnabled()) + LOG.debug("Scheduling addition of member to {}: {}", getName(), server); + addSecondaryWhenPrimaryIsNonNull(server); + } + } + + /** + * Adds a server as a secondary in the replica set. + * <p/> + * If {@link #getPrimary} returns non-null submit the secondary to the primary's + * {@link MongoDBClientSupport}. Otherwise, reschedule the task to run again in three + * seconds time (in the hope that next time the primary will be available). + */ + private void addSecondaryWhenPrimaryIsNonNull(final MongoDBServer secondary) { + // TODO Don't use executor, use ExecutionManager + executor.submit(new Runnable() { + @Override + public void run() { + // SERVICE_UP is not guaranteed when additional members are added to the set. + Boolean isAvailable = secondary.getAttribute(MongoDBServer.SERVICE_UP); + MongoDBServer primary = getPrimary(); + boolean reschedule; + if (Boolean.TRUE.equals(isAvailable) && primary != null) { + boolean added = primary.addMemberToReplicaSet(secondary, nextMemberId.incrementAndGet()); + if (added) { + LOG.info("{} added to replica set {}", secondary, getName()); + reschedule = false; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{} could not be added to replica set via {}; rescheduling", secondary, getName()); + } + reschedule = true; + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Rescheduling addition of member {} to replica set {}: service_up={}, primary={}", + new Object[] {secondary, getName(), isAvailable, primary}); + } + reschedule = true; + } + + if (reschedule) { + // TODO Could limit number of retries + executor.schedule(this, 3, TimeUnit.SECONDS); + } + } + }); + } + + /** + * Removes a server from the replica set. + * <p/> + * Submits a task that waits for the member to be down and for the replica set to have a primary + * member, then reconfigures the set to remove the member, to {@link #executor}. If either of the + * two conditions are not met then the task reschedules itself. + * + * @param member The server to be removed from the replica set. + */ + private void serverRemoved(final MongoDBServer member) { + if (LOG.isDebugEnabled()) + LOG.debug("Scheduling removal of member from {}: {}", getName(), member); + // FIXME is there a chance of race here? + if (member.equals(getAttribute(PRIMARY_ENTITY))) + setAttribute(PRIMARY_ENTITY, null); + executor.submit(new Runnable() { + @Override + public void run() { + // Wait until the server has been stopped before reconfiguring the set. Quoth the MongoDB doc: + // for best results always shut down the mongod instance before removing it from a replica set. + Boolean isAvailable = member.getAttribute(MongoDBServer.SERVICE_UP); + // Wait for the replica set to elect a new primary if the set is reconfiguring itself. + MongoDBServer primary = getPrimary(); + boolean reschedule; + + if (primary != null && !isAvailable) { + boolean removed = primary.removeMemberFromReplicaSet(member); + if (removed) { + LOG.info("Removed {} from replica set {}", member, getName()); + reschedule = false; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("{} could not be removed from replica set via {}; rescheduling", member, getName()); + } + reschedule = true; + } + + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Rescheduling removal of member {} from replica set {}: service_up={}, primary={}", + new Object[]{member, getName(), isAvailable, primary}); + } + reschedule = true; + } + + if (reschedule) { + // TODO Could limit number of retries + executor.schedule(this, 3, TimeUnit.SECONDS); + } + } + }); + } + + @Override + public void start(Collection<? extends Location> locations) { + // Promises that all the cluster's members have SERVICE_UP true on returning. + super.start(locations); + policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName(getName() + " membership tracker") + .configure("group", this)); + + for (AttributeSensor<Long> sensor: SENSORS_TO_SUM) + addEnricher(Enrichers.builder() + .aggregating(sensor) + .publishing(sensor) + .fromMembers() + .computingSum() + .valueToReportIfNoSensors(null) + .defaultValueForUnreportedSensors(null) + .build()); + + // FIXME would it be simpler to have a *subscription* on four or five sensors on allMembers, including SERVICE_UP + // (which we currently don't check), rather than an enricher, and call to an "update" method? + addEnricher(Enrichers.builder() + .aggregating(MongoDBServer.REPLICA_SET_PRIMARY_ENDPOINT) + .publishing(MongoDBServer.REPLICA_SET_PRIMARY_ENDPOINT) + .fromMembers() + .valueToReportIfNoSensors(null) + .computing(new Function<Collection<String>, String>() { + @Override + public String apply(Collection<String> input) { + if (input==null || input.isEmpty()) return null; + Set<String> distinct = MutableSet.of(); + for (String endpoint: input) + if (!Strings.isBlank(endpoint)) + distinct.add(endpoint); + if (distinct.size()>1) + LOG.warn("Mongo replica set "+MongoDBReplicaSetImpl.this+" detetcted multiple masters (transitioning?): "+distinct); + return input.iterator().next(); + }}) + .build()); + + addEnricher(Enrichers.builder() + .aggregating(MongoDBServer.MONGO_SERVER_ENDPOINT) + .publishing(REPLICA_SET_ENDPOINTS) + .fromMembers() + .valueToReportIfNoSensors(null) + .computing(new Function<Collection<String>, List<String>>() { + @Override + public List<String> apply(Collection<String> input) { + Set<String> endpoints = new TreeSet<String>(); + for (String endpoint: input) { + if (!Strings.isBlank(endpoint)) { + endpoints.add(endpoint); + } + } + return MutableList.copyOf(endpoints); + }}) + .build()); + + subscribeToMembers(this, MongoDBServer.IS_PRIMARY_FOR_REPLICA_SET, new SensorEventListener<Boolean>() { + @Override public void onEvent(SensorEvent<Boolean> event) { + if (Boolean.TRUE == event.getValue()) + setAttribute(PRIMARY_ENTITY, (MongoDBServer)event.getSource()); + } + }); + + } + + @Override + public void stop() { + // Do we want to remove the members from the replica set? + // - if the set is being stopped forever it's irrelevant + // - if the set might be restarted I think it just inconveniences us + // Terminate the executor immediately. + // TODO Note that after this the executor will not run if the set is restarted. + executor.shutdownNow(); + super.stop(); + setAttribute(Startable.SERVICE_UP, false); + } + + @Override + public void onManagementStopped() { + super.onManagementStopped(); + executor.shutdownNow(); + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override protected void onEntityChange(Entity member) { + // Ignored + } + @Override protected void onEntityAdded(Entity member) { + ((MongoDBReplicaSetImpl)entity).serverAdded((MongoDBServer) member); + } + @Override protected void onEntityRemoved(Entity member) { + ((MongoDBReplicaSetImpl)entity).serverRemoved((MongoDBServer) member); + } + }; +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServer.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServer.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServer.java new file mode 100644 index 0000000..5300684 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServer.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +import org.bson.BasicBSONObject; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.AttributeSensor.SensorPersistenceMode; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +@Catalog(name="MongoDB Server", + description="MongoDB (from \"humongous\") is a scalable, high-performance, open source NoSQL database", + iconUrl="classpath:///mongodb-logo.png") +@ImplementedBy(MongoDBServerImpl.class) +public interface MongoDBServer extends AbstractMongoDBServer { + + @SetFromFlag("mongodbConfTemplateUrl") + ConfigKey<String> MONGODB_CONF_TEMPLATE_URL = ConfigKeys.newConfigKeyWithDefault( + AbstractMongoDBServer.MONGODB_CONF_TEMPLATE_URL, + "classpath://org/apache/brooklyn/entity/nosql/mongodb/default-mongod.conf"); + + // See http://docs.mongodb.org/ecosystem/tools/http-interfaces/#http-console + // This is *always* 1000 more than port. We disable if it is not available. + PortAttributeSensorAndConfigKey HTTP_PORT = + new PortAttributeSensorAndConfigKey("mongodb.server.httpPort", "HTTP port for the server (estimated)", "28017+"); + + @SetFromFlag("enableRestInterface") + ConfigKey<Boolean> ENABLE_REST_INTERFACE = ConfigKeys.newBooleanConfigKey( + "mongodb.config.enable_rest", "Adds --rest to server startup flags when true", Boolean.FALSE); + + AttributeSensor<String> HTTP_INTERFACE_URL = Sensors.newStringSensor( + "mongodb.server.http_interface", "URL of the server's HTTP console"); + + AttributeSensor<BasicBSONObject> STATUS_BSON = Sensors.builder(BasicBSONObject.class, "mongodb.server.status.bson") + .description("Server status (BSON/JSON map ojbect)") + .persistence(SensorPersistenceMode.NONE) + .build(); + + AttributeSensor<Double> UPTIME_SECONDS = Sensors.newDoubleSensor( + "mongodb.server.uptime", "Server uptime in seconds"); + + AttributeSensor<Long> OPCOUNTERS_INSERTS = Sensors.newLongSensor( + "mongodb.server.opcounters.insert", "Server inserts"); + + AttributeSensor<Long> OPCOUNTERS_QUERIES = Sensors.newLongSensor( + "mongodb.server.opcounters.query", "Server queries"); + + AttributeSensor<Long> OPCOUNTERS_UPDATES = Sensors.newLongSensor( + "mongodb.server.opcounters.update", "Server updates"); + + AttributeSensor<Long> OPCOUNTERS_DELETES = Sensors.newLongSensor( + "mongodb.server.opcounters.delete", "Server deletes"); + + AttributeSensor<Long> OPCOUNTERS_GETMORE = Sensors.newLongSensor( + "mongodb.server.opcounters.getmore", "Server getmores"); + + AttributeSensor<Long> OPCOUNTERS_COMMAND = Sensors.newLongSensor( + "mongodb.server.opcounters.command", "Server commands"); + + AttributeSensor<Long> NETWORK_BYTES_IN = Sensors.newLongSensor( + "mongodb.server.network.bytesIn", "Server incoming network traffic (in bytes)"); + + AttributeSensor<Long> NETWORK_BYTES_OUT = Sensors.newLongSensor( + "mongodb.server.network.bytesOut", "Server outgoing network traffic (in bytes)"); + + AttributeSensor<Long> NETWORK_NUM_REQUESTS = Sensors.newLongSensor( + "mongodb.server.network.numRequests", "Server network requests"); + + /** A single server's replica set configuration **/ + ConfigKey<MongoDBReplicaSet> REPLICA_SET = new BasicConfigKey<MongoDBReplicaSet>(MongoDBReplicaSet.class, + "mongodb.replicaset", "The replica set to which the server belongs. " + + "Users should not set this directly when creating a new replica set."); + + AttributeSensor<ReplicaSetMemberStatus> REPLICA_SET_MEMBER_STATUS = Sensors.newSensor( + ReplicaSetMemberStatus.class, "mongodb.server.replicaSet.memberStatus", "The status of this server in the replica set"); + + AttributeSensor<Boolean> IS_PRIMARY_FOR_REPLICA_SET = Sensors.newBooleanSensor( + "mongodb.server.replicaSet.isPrimary", "True if this server is the write master for the replica set"); + + AttributeSensor<Boolean> IS_SECONDARY_FOR_REPLICA_SET = Sensors.newBooleanSensor( + "mongodb.server.replicaSet.isSecondary", "True if this server is a secondary server in the replica set"); + + AttributeSensor<String> REPLICA_SET_PRIMARY_ENDPOINT = Sensors.newStringSensor( + "mongodb.server.replicaSet.primary.endpoint", "The host:port of the server which is acting as primary (master) for the replica set"); + + AttributeSensor<String> MONGO_SERVER_ENDPOINT = Sensors.newStringSensor( + "mongodb.server.endpoint", "The host:port where this server is listening"); + + /** + * @return The replica set the server belongs to, or null if the server is a standalone instance. + */ + MongoDBReplicaSet getReplicaSet(); + + /** + * @return True if the server is a child of {@link MongoDBReplicaSet}. + */ + boolean isReplicaSetMember(); + + /** + * Initialises a replica set at the server the method is invoked on. + * @param replicaSetName The name for the replica set. + * @param id The id to be given to this server in the replica set configuration. + * @return True if initialisation is successful. + */ + boolean initializeReplicaSet(String replicaSetName, Integer id); + + /** + * Reconfigures the replica set that the server the method is invoked on is the primary member of + * to include a new member. + * <p/> + * Note that this can cause long downtime (typically 10-20s, even up to a minute). + * + * @param secondary New member of the set. + * @param id The id for the new set member. Must be unique within the set; its validity is not checked. + * @return True if addition is successful. False if the server this is called on is not the primary + * member of the replica set. + */ + boolean addMemberToReplicaSet(MongoDBServer secondary, Integer id); + + /** + * Reconfigures the replica set that the server the method is invoked on is the primary member of + * to remove the given server. + * @param server The server to remove. + * @return True if removal is successful. False if the server this is called on is not the primary + * member of the replica set. + */ + boolean removeMemberFromReplicaSet(MongoDBServer server); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java new file mode 100644 index 0000000..346b1ee --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +import java.net.UnknownHostException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.bson.BasicBSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.render.RendererHints; +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; +import brooklyn.location.access.BrooklynAccessUtils; + +import com.google.common.base.Functions; +import com.google.common.base.Objects; +import com.google.common.net.HostAndPort; + +public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBServer { + + private static final Logger LOG = LoggerFactory.getLogger(MongoDBServerImpl.class); + + static { + RendererHints.register(HTTP_INTERFACE_URL, RendererHints.namedActionWithUrl()); + } + + private FunctionFeed serviceStats; + private FunctionFeed replicaSetStats; + private MongoDBClientSupport client; + + public MongoDBServerImpl() { + } + + @Override + public Class<?> getDriverInterface() { + return MongoDBDriver.class; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + connectServiceUpIsRunning(); + + int port = getAttribute(MongoDBServer.PORT); + HostAndPort accessibleAddress = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, port); + setAttribute(MONGO_SERVER_ENDPOINT, String.format("http://%s:%d", + accessibleAddress.getHostText(), accessibleAddress.getPort())); + + int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(HTTP_PORT)).getPort(); + setAttribute(HTTP_INTERFACE_URL, String.format("http://%s:%d", + accessibleAddress.getHostText(), httpConsolePort)); + + try { + client = MongoDBClientSupport.forServer(this); + } catch (UnknownHostException e) { + LOG.warn("Unable to create client connection to {}, not connecting sensors: {} ", this, e.getMessage()); + return; + } + + serviceStats = FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig<Object, BasicBSONObject>(STATUS_BSON) + .period(2, TimeUnit.SECONDS) + .callable(new Callable<BasicBSONObject>() { + @Override + public BasicBSONObject call() throws Exception { + return MongoDBServerImpl.this.getAttribute(SERVICE_UP) + ? client.getServerStatus() + : null; + } + }) + .onException(Functions.<BasicBSONObject>constant(null))) + .build(); + + if (isReplicaSetMember()) { + replicaSetStats = FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig<Object, ReplicaSetMemberStatus>(REPLICA_SET_MEMBER_STATUS) + .period(2, TimeUnit.SECONDS) + .callable(new Callable<ReplicaSetMemberStatus>() { + /** + * Calls {@link MongoDBClientSupport#getReplicaSetStatus} and + * extracts <code>myState</code> from the response. + * @return + * The appropriate {@link org.apache.brooklyn.entity.nosql.mongodb.ReplicaSetMemberStatus} + * if <code>myState</code> was non-null, {@link ReplicaSetMemberStatus#UNKNOWN} otherwise. + */ + @Override + public ReplicaSetMemberStatus call() { + BasicBSONObject serverStatus = client.getReplicaSetStatus(); + int state = serverStatus.getInt("myState", -1); + return ReplicaSetMemberStatus.fromCode(state); + } + }) + .onException(Functions.constant(ReplicaSetMemberStatus.UNKNOWN))) + .build(); + } else { + setAttribute(IS_PRIMARY_FOR_REPLICA_SET, false); + setAttribute(IS_SECONDARY_FOR_REPLICA_SET, false); + } + + // Take interesting details from STATUS. + subscribe(this, STATUS_BSON, new SensorEventListener<BasicBSONObject>() { + @Override public void onEvent(SensorEvent<BasicBSONObject> event) { + BasicBSONObject map = event.getValue(); + if (map != null && !map.isEmpty()) { + setAttribute(UPTIME_SECONDS, map.getDouble("uptime", 0)); + + // Operations + BasicBSONObject opcounters = (BasicBSONObject) map.get("opcounters"); + setAttribute(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0)); + setAttribute(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0)); + setAttribute(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0)); + setAttribute(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0)); + setAttribute(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0)); + setAttribute(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0)); + + // Network stats + BasicBSONObject network = (BasicBSONObject) map.get("network"); + setAttribute(NETWORK_BYTES_IN, network.getLong("bytesIn", 0)); + setAttribute(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0)); + setAttribute(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0)); + + // Replica set stats + BasicBSONObject repl = (BasicBSONObject) map.get("repl"); + if (isReplicaSetMember() && repl != null) { + setAttribute(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster")); + setAttribute(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary")); + setAttribute(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary")); + } + } + } + }); + } + + @Override + protected void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + if (serviceStats != null) serviceStats.stop(); + if (replicaSetStats != null) replicaSetStats.stop(); + } + + @Override + public MongoDBReplicaSet getReplicaSet() { + return getConfig(MongoDBServer.REPLICA_SET); + } + + @Override + public boolean isReplicaSetMember() { + return getReplicaSet() != null; + } + + @Override + public boolean initializeReplicaSet(String replicaSetName, Integer id) { + return client.initializeReplicaSet(replicaSetName, id); + } + + @Override + public boolean addMemberToReplicaSet(MongoDBServer secondary, Integer id) { + // TODO The attributes IS_PRIMARY_FOR_REPLICA_SET and REPLICA_SET_MEMBER_STATUS can be out-of-sync. + // The former is obtained by an enricher that listens to STATUS_BSON (set by client.getServerStatus()). + // The latter is set by a different feed doing client.getReplicaSetStatus().getInt("myState"). + // The ReplicaSet uses REPLICA_SET_MEMBER_STATUS to determine which node to call. + // + // Relying on caller to respect the `false` result, to retry. + if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) { + LOG.warn("Attempted to add {} to replica set at server that is not primary: {}", secondary, this); + return false; + } + return client.addMemberToReplicaSet(secondary, id); + } + + @Override + public boolean removeMemberFromReplicaSet(MongoDBServer server) { + if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) { + LOG.warn("Attempted to remove {} from replica set at server that is not primary: {}", server, this); + return false; + } + return client.removeMemberFromReplicaSet(server); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("id", getId()) + .add("hostname", getAttribute(HOSTNAME)) + .add("port", getAttribute(PORT)) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBSshDriver.java new file mode 100644 index 0000000..819014d --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBSshDriver.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +import static com.google.common.base.Preconditions.checkState; +import brooklyn.location.basic.SshMachineLocation; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; + +public class MongoDBSshDriver extends AbstractMongoDBSshDriver implements MongoDBDriver { + + public MongoDBSshDriver(MongoDBServerImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public MongoDBServerImpl getEntity() { + return MongoDBServerImpl.class.cast(super.getEntity()); + } + + @Override + public void launch() { + MongoDBServer server = getEntity(); + + ImmutableList.Builder<String> argsBuilder = getArgsBuilderWithDefaults(server) + .add("--dbpath", getDataDirectory()); + + if (server.isReplicaSetMember()) { + String replicaSetName = server.getReplicaSet().getName(); + checkState(!Strings.isNullOrEmpty(replicaSetName), "Replica set name must not be null or empty"); + argsBuilder.add("--replSet", replicaSetName); + } + + if (Boolean.TRUE.equals(server.getConfig(MongoDBServer.ENABLE_REST_INTERFACE))) + argsBuilder.add("--rest"); + + launch(argsBuilder); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetConfig.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetConfig.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetConfig.java new file mode 100644 index 0000000..a4ecebb --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetConfig.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Iterator; + +import org.bson.BSONObject; +import org.bson.BasicBSONObject; +import org.bson.types.BasicBSONList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.net.HostAndPort; + +import brooklyn.location.access.BrooklynAccessUtils; + +/** + * Simplifies the creation of configuration objects for Mongo DB replica sets. + * <p/> + * A configuration object is structured like this: + * <pre> + * { + * "_id" : "replica-set-name", + * "version" : 3, + * "members" : [ + * { "_id" : 0, "host" : "Sams.local:27017" }, + * { "_id" : 1, "host" : "Sams.local:27018" }, + * { "_id" : 2, "host" : "Sams.local:27019" } + * ] + * } + * </pre> + * To add or remove servers to a replica set you must redefine this configuration + * (run <code>replSetReconfig</code> on the primary) with the new <code>members</code> + * list and the <code>version</code> updated. + */ +public class ReplicaSetConfig { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetConfig.class); + static final int MAXIMUM_REPLICA_SET_SIZE = 12; + static final int MAXIMUM_VOTING_MEMBERS = 7; + + private Optional<HostAndPort> primary = Optional.absent(); + + private String name; + private Integer version; + BasicBSONList members; + + public ReplicaSetConfig(String name) { + this(name, new BasicBSONList()); + } + + public ReplicaSetConfig(String name, BasicBSONList existingMembers) { + this.name = name; + this.members = existingMembers; + this.version = 1; + } + + /** + * Creates a configuration with the given name. + */ + public static ReplicaSetConfig builder(String name) { + return new ReplicaSetConfig(name); + } + + /** + * Creates a configuration from an existing configuration. + * <p/> + * Automatically increments the replica set's version number. + */ + public static ReplicaSetConfig fromExistingConfig(BSONObject config) { + checkNotNull(config); + checkArgument(config.containsField("_id"), "_id missing from replica set config"); + checkArgument(config.containsField("version"), "version missing from replica set config"); + checkArgument(config.containsField("members"), "members missing from replica set config"); + + String name = (String) config.get("_id"); + Integer version = (Integer) config.get("version"); + BasicBSONList members = (BasicBSONList) config.get("members"); + + return new ReplicaSetConfig(name, members).version(++version); + } + + /** + * Sets the version of the configuration. The version number must increase as the replica set changes. + */ + public ReplicaSetConfig version(Integer version) { + this.version = version; + return this; + } + + /** + * Notes the primary member of the replica. Primary members will always be voting members. + */ + public ReplicaSetConfig primary(HostAndPort primary) { + this.primary = Optional.of(primary); + return this; + } + + /** + * Adds a new member to the replica set config using {@link MongoDBServer#HOSTNAME} and {@link MongoDBServer#PORT} + * for hostname and port. Doesn't attempt to check that the id is free. + */ + public ReplicaSetConfig member(MongoDBServer server, Integer id) { + // TODO: Switch to SUBNET_HOSTNAME and there should be no need for a Brooklyn accessible + // address. It will require modification to MongoDBClientSupport, though, since it sets + // the primary to the host/port accessible from Brooklyn. + HostAndPort hap = BrooklynAccessUtils.getBrooklynAccessibleAddress(server, server.getAttribute(MongoDBServer.PORT)); + return member(hap.getHostText(), hap.getPort(), id); + } + + /** + * Adds a new member to the replica set config using the given {@link HostAndPort} for hostname and port. + * Doesn't attempt to check that the id is free. + */ + public ReplicaSetConfig member(HostAndPort address, Integer id) { + return member(address.getHostText(), address.getPort(), id); + } + + /** + * Adds a new member to the replica set config with the given hostname, port and id. Doesn't attempt to check + * that the id is free. + */ + public ReplicaSetConfig member(String hostname, Integer port, Integer id) { + if (members.size() == MAXIMUM_REPLICA_SET_SIZE) { + throw new IllegalStateException(String.format( + "Replica set {} exceeds maximum size of {} with addition of member at {}:{}", + new Object[]{name, MAXIMUM_REPLICA_SET_SIZE, hostname, port})); + } + BasicBSONObject member = new BasicBSONObject(); + member.put("_id", id); + member.put("host", String.format("%s:%s", hostname, port)); + members.add(member); + return this; + } + + /** Removes the first entity using {@link MongoDBServer#HOSTNAME} and {@link MongoDBServer#PORT}. */ + public ReplicaSetConfig remove(MongoDBServer server) { + HostAndPort hap = BrooklynAccessUtils.getBrooklynAccessibleAddress(server, server.getAttribute(MongoDBServer.PORT)); + return remove(hap.getHostText(), hap.getPort()); + } + + /** Removes the first entity with host and port matching the given address. */ + public ReplicaSetConfig remove(HostAndPort address) { + return remove(address.getHostText(), address.getPort()); + } + + /** + * Removes the first entity with the given hostname and port from the list of members + */ + public ReplicaSetConfig remove(String hostname, Integer port) { + String host = String.format("%s:%s", hostname, port); + Iterator<Object> it = this.members.iterator(); + while (it.hasNext()) { + Object next = it.next(); + if (next instanceof BasicBSONObject) { + BasicBSONObject basicBSONObject = (BasicBSONObject) next; + if (host.equals(basicBSONObject.getString("host"))) { + it.remove(); + break; + } + } + } + return this; + } + + /** + * @return A {@link BasicBSONObject} representing the configuration that is suitable for a MongoDB server. + */ + public BasicBSONObject build() { + setVotingMembers(); + BasicBSONObject config = new BasicBSONObject(); + config.put("_id", name); + config.put("version", version); + config.put("members", members); + return config; + } + + /** + * Selects 1, 3, 5 or 7 members to have a vote. The primary member (as set by + * {@link #primary(com.google.common.net.HostAndPort)}) is guaranteed a vote if + * it is in {@link #members}. + * <p/> + * + * Reconfiguring a server to be voters when they previously did not have votes generally triggers + * a primary election. This confuses the MongoDB Java driver, which logs an error like: + * <pre> + * WARN emptying DBPortPool to sams.home/192.168.1.64:27019 b/c of error + * java.io.EOFException: null + * at org.bson.io.Bits.readFully(Bits.java:48) ~[mongo-java-driver-2.11.3.jar:na] + * WARN Command { "replSetReconfig" : ... } on sams.home/192.168.1.64:27019 failed + * com.mongodb.MongoException$Network: Read operation to server sams.home/192.168.1.64:27019 failed on database admin + * at com.mongodb.DBTCPConnector.innerCall(DBTCPConnector.java:253) ~[mongo-java-driver-2.11.3.jar:na] + * Caused by: java.io.EOFException: null + * at org.bson.io.Bits.readFully(Bits.java:48) ~[mongo-java-driver-2.11.3.jar:na] + * </pre> + * + * The MongoDB documentation on <a href=http://docs.mongodb.org/manual/tutorial/configure-a-non-voting-replica-set-member/"> + * non-voting members</a> says: + * <blockquote> + * Initializes a new replica set configuration. Disconnects the shell briefly and forces a + * reconnection as the replica set renegotiates which member will be primary. As a result, + * the shell will display an error even if this command succeeds. + * </blockquote> + * + * So the problem is more that the MongoDB Java driver does not understand why the server + * may have disconnected and is to eager to report a problem. + */ + private void setVotingMembers() { + if (LOG.isDebugEnabled()) + LOG.debug("Setting voting and non-voting members of replica set: {}", name); + boolean seenPrimary = false; + String expectedPrimary = primary.isPresent() + ? primary.get().getHostText() + ":" + primary.get().getPort() + : ""; + + // Ensure an odd number of voters + int setSize = this.members.size(); + int nonPrimaryVotingMembers = Math.min(setSize % 2 == 0 ? setSize - 1 : setSize, MAXIMUM_VOTING_MEMBERS); + if (primary.isPresent()) { + if (LOG.isTraceEnabled()) + LOG.trace("Reserving vote for primary: " + expectedPrimary); + nonPrimaryVotingMembers -= 1; + } + + for (Object member : this.members) { + if (member instanceof BasicBSONObject) { + BasicBSONObject bsonObject = BasicBSONObject.class.cast(member); + String host = bsonObject.getString("host"); + + // is this member noted as the primary? + if (this.primary.isPresent() && expectedPrimary.equals(host)) { + bsonObject.put("votes", 1); + seenPrimary = true; + if (LOG.isDebugEnabled()) + LOG.debug("Voting member (primary) of set {}: {}", name, host); + } else if (nonPrimaryVotingMembers-- > 0) { + bsonObject.put("votes", 1); + if (LOG.isDebugEnabled()) + LOG.debug("Voting member of set {}: {}", name, host); + } else { + bsonObject.put("votes", 0); + if (LOG.isDebugEnabled()) + LOG.debug("Non-voting member of set {}: {}", name, host); + } + } else { + LOG.error("Unexpected entry in replica set members list: " + member); + } + } + + if (primary.isPresent() && !seenPrimary) { + LOG.warn("Cannot give replica set primary a vote in reconfigured set: " + + "primary was indicated as {} but no member with that host and port was seen in the set. " + + "The replica set now has an even number of voters.", + this.primary); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetMemberStatus.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetMemberStatus.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetMemberStatus.java new file mode 100644 index 0000000..16df3a7 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/ReplicaSetMemberStatus.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb; + +/** + * @see <a href="http://docs.mongodb.org/manual/reference/replica-status/">Replica set status reference</a> + */ +public enum ReplicaSetMemberStatus { + + STARTUP("Start up, phase 1 (parsing configuration)"), + PRIMARY("Primary"), + SECONDARY("Secondary"), + RECOVERING("Member is recovering (initial sync, post-rollback, stale members)"), + FATAL("Member has encountered an unrecoverable error"), + STARTUP2("Start up, phase 2 (forking threads)"), + UNKNOWN("Unknown (the set has never connected to the member)"), + ARBITER("Member is an arbiter"), + DOWN("Member is not accessible to the set"), + ROLLBACK("Member is rolling back data. See rollback"), + SHUNNED("Member has been removed from replica set"); + + private final String description; + + ReplicaSetMemberStatus(String description) { + this.description = description; + } + + public static ReplicaSetMemberStatus fromCode(int code) { + switch (code) { + case 0: return STARTUP; + case 1: return PRIMARY; + case 2: return SECONDARY; + case 3: return RECOVERING; + case 4: return FATAL; + case 5: return STARTUP2; + case 6: return UNKNOWN; + case 7: return ARBITER; + case 8: return DOWN; + case 9: return ROLLBACK; + case 10: return SHUNNED; + default: return UNKNOWN; + } + } + + @Override + public String toString() { + return name() + ": " + description; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouter.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouter.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouter.java new file mode 100644 index 0000000..48c9c63 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import java.util.List; +import java.util.Map; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SameServerEntity; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +import com.google.common.reflect.TypeToken; + +@ImplementedBy(CoLocatedMongoDBRouterImpl.class) +public interface CoLocatedMongoDBRouter extends SameServerEntity { + @SuppressWarnings("serial") + @SetFromFlag("siblingSpecs") + ConfigKey<Iterable<EntitySpec<?>>> SIBLING_SPECS = ConfigKeys.newConfigKey(new TypeToken<Iterable<EntitySpec<?>>>(){}, + "mongodb.colocatedrouter.sibling.specs", "Collection of (configured) specs for entities to be co-located with the router"); + + @SetFromFlag("shardedDeployment") + ConfigKey<MongoDBShardedDeployment> SHARDED_DEPLOYMENT = ConfigKeys.newConfigKey(MongoDBShardedDeployment.class, + "mongodb.colocatedrouter.shardeddeployment", "Sharded deployment to which the router should report"); + + /** Deprecated since 0.7.0 use {@link #PROPAGATING_SENSORS} instead. */ + @Deprecated + @SuppressWarnings("serial") + @SetFromFlag("propogatingSensors") + ConfigKey<List<Map<String, ?>>> PROPOGATING_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<Map<String, ?>>>(){}, + "mongodb.colocatedrouter.propogating.sensors", "List of sensors to be propogated from child members"); + + @SetFromFlag("propagatingSensors") + ConfigKey<List<Map<String, ?>>> PROPAGATING_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<Map<String, ?>>>(){}, + "mongodb.colocatedrouter.propagating.sensors", "List of sensors to be propogated from child members"); + + public static AttributeSensor<MongoDBRouter> ROUTER = Sensors.newSensor(MongoDBRouter.class, + "mongodb.colocatedrouter.router", "Router"); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouterImpl.java new file mode 100644 index 0000000..35252ae --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/CoLocatedMongoDBRouterImpl.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import java.util.Collection; + +import brooklyn.enricher.Enrichers; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.SameServerEntityImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.location.Location; + +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; + +public class CoLocatedMongoDBRouterImpl extends SameServerEntityImpl implements CoLocatedMongoDBRouter { + @Override + public void init() { + super.init(); + + for (EntitySpec<?> siblingSpec : getConfig(CoLocatedMongoDBRouter.SIBLING_SPECS)) { + addChild(siblingSpec); + } + } + + @Override + protected void doStart(Collection<? extends Location> locations) { + // TODO Changed to create the router child after init as a workaround. + // When we use `mongo-sharded.yaml`, and we call + // `getConfig(CoLocatedMongoDBRouter.SHARDED_DEPLOYMENT)`, + // the value is `$brooklyn:component("shardeddeployment")`. + // To look up the component, it tries to do `entity().getApplication()` to + // search the entities for one with the correct id. However if being done + // during `init()`, then this (which is returned by `entity()`) has not had its parent + // set, so `entity().getApplication()` returns null. + // + // We should move this code back to `init()` once we have a solution for that. + // We can also remove the call to Entities.manage() once this is in init() again. + + MongoDBRouter router = addChild(EntitySpec.create(MongoDBRouter.class) + .configure(MongoDBRouter.CONFIG_SERVERS, + DependentConfiguration.attributeWhenReady( + getConfig(CoLocatedMongoDBRouter.SHARDED_DEPLOYMENT), + MongoDBConfigServerCluster.CONFIG_SERVER_ADDRESSES))); + Entities.manage(router); + setAttribute(ROUTER, (MongoDBRouter) Iterables.tryFind(getChildren(), Predicates.instanceOf(MongoDBRouter.class)).get()); + addEnricher(Enrichers.builder().propagating(MongoDBRouter.PORT).from(router).build()); + + super.doStart(locations); + setAttribute(Startable.SERVICE_UP, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServer.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServer.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServer.java new file mode 100644 index 0000000..acecbc4 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import org.apache.brooklyn.entity.nosql.mongodb.AbstractMongoDBServer; + +import brooklyn.entity.proxying.ImplementedBy; + +@ImplementedBy(MongoDBConfigServerImpl.class) +public interface MongoDBConfigServer extends AbstractMongoDBServer { + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerCluster.java new file mode 100644 index 0000000..79f78ac --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerCluster.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; + +import com.google.common.reflect.TypeToken; + +@ImplementedBy(MongoDBConfigServerClusterImpl.class) +public interface MongoDBConfigServerCluster extends DynamicCluster { + + @SuppressWarnings("serial") + AttributeSensor<Iterable<String>> CONFIG_SERVER_ADDRESSES = Sensors.newSensor(new TypeToken<Iterable<String>>() {}, + "mongodb.config.server.addresses", "List of config server hostnames and ports"); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerClusterImpl.java new file mode 100644 index 0000000..34651bb --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerClusterImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import java.util.Collection; + +import brooklyn.entity.Entity; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.location.Location; +import brooklyn.location.access.BrooklynAccessUtils; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.net.HostAndPort; + +public class MongoDBConfigServerClusterImpl extends DynamicClusterImpl implements MongoDBConfigServerCluster { + + @Override + protected EntitySpec<?> getMemberSpec() { + if (super.getMemberSpec() != null) + return super.getMemberSpec(); + return EntitySpec.create(MongoDBConfigServer.class); + } + + @Override + public void start(Collection<? extends Location> locs) { + super.start(locs); + + // TODO this should be an enricher + Iterable<String> memberHostNamesAndPorts = Iterables.transform(getMembers(), new Function<Entity, String>() { + @Override + public String apply(Entity entity) { + return entity.getAttribute(MongoDBConfigServer.SUBNET_HOSTNAME) + ":" + entity.getAttribute(MongoDBConfigServer.PORT); + } + }); + setAttribute(MongoDBConfigServerCluster.CONFIG_SERVER_ADDRESSES, ImmutableList.copyOf(memberHostNamesAndPorts)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerDriver.java new file mode 100644 index 0000000..7963b22 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerDriver.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface MongoDBConfigServerDriver extends SoftwareProcessDriver { + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerImpl.java new file mode 100644 index 0000000..b8ce2b8 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import brooklyn.entity.basic.SoftwareProcessImpl; + +public class MongoDBConfigServerImpl extends SoftwareProcessImpl implements MongoDBConfigServer { + + @Override + public Class<?> getDriverInterface() { + return MongoDBConfigServerDriver.class; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + connectServiceUpIsRunning(); + } + +}