Added support for elastic search cluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/587e6999 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/587e6999 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/587e6999 Branch: refs/heads/master Commit: 587e6999d12bdf71d53dc0cb553eb31927689e88 Parents: e409c68 Author: Martin Harris <[email protected]> Authored: Wed Jun 4 12:17:22 2014 +0100 Committer: Martin Harris <[email protected]> Committed: Tue Jun 17 10:19:58 2014 +0100 ---------------------------------------------------------------------- .../elasticsearch/ElasticSearchCluster.java | 22 ++++ .../elasticsearch/ElasticSearchClusterImpl.java | 104 +++++++++++++++++++ .../nosql/elasticsearch/ElasticSearchNode.java | 30 ++++-- .../elasticsearch/ElasticSearchNodeImpl.java | 29 +++++- .../ElasticSearchNodeSshDriver.java | 12 ++- 5 files changed, 181 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java new file mode 100644 index 0000000..4f6448f --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java @@ -0,0 +1,22 @@ +package brooklyn.entity.nosql.elasticsearch; + +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +/** + * A cluster of {@link ElasticSearchNode}s based on {@link DynamicCluster} which can be resized by a policy if required. + */ +@ImplementedBy(ElasticSearchClusterImpl.class) +public interface ElasticSearchCluster extends DynamicCluster { + @SetFromFlag("clusterName") + BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, + "elasticsearch.cluster.name", "Name of the ElasticSearch cluster", "BrooklynCluster"); + + AttributeSensor<String> NODE_LIST = Sensors.newStringSensor("elasticsearch.cluster.node.list", "Comma delimited list of nodes in hostname:port format"); + + String getName(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java new file mode 100644 index 0000000..5e1deb6 --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java @@ -0,0 +1,104 @@ +package brooklyn.entity.nosql.elasticsearch; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.location.Location; +import brooklyn.policy.PolicySpec; +import brooklyn.util.text.Strings; + +public class ElasticSearchClusterImpl extends DynamicClusterImpl implements ElasticSearchCluster { + + private AtomicInteger nextMemberId = new AtomicInteger(0); + private MemberTrackingPolicy policy; + + public ElasticSearchClusterImpl() { + + } + + @Override + public void init() { + policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName(getName() + " membership tracker") + .configure("group", this) + .configure(AbstractMembershipTrackingPolicy.NOTIFY_ON_DUPLICATES, false)); + } + + @Override + public void start(Collection<? extends Location> locations) { + super.start(locations); + } + + @Override + public void stop() { + if (policy != null) { + removePolicy(policy); + } + super.stop(); + } + + @Override + protected boolean calculateServiceUp() { + boolean up = false; + for (Entity member : getMembers()) { + if (Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) up = true; + } + return up; + } + + @Override + protected EntitySpec<?> getMemberSpec() { + @SuppressWarnings("unchecked") + EntitySpec<ElasticSearchNode> spec = (EntitySpec<ElasticSearchNode>)getConfig(MEMBER_SPEC, EntitySpec.create(ElasticSearchNode.class)); + + spec.configure(ElasticSearchNode.CLUSTER_NAME, getConfig(ElasticSearchClusterImpl.CLUSTER_NAME)) + .configure(ElasticSearchNode.MULTICAST_ENABLED, false) + .configure(ElasticSearchNode.UNICAST_ENABLED, false) + .configure(ElasticSearchNode.NODE_NAME, "elasticsearch-" + nextMemberId.incrementAndGet()); + + return spec; + } + + @Override + public String getName() { + return getConfig(CLUSTER_NAME); + } + + private void resetCluster() { + String nodeList = ""; + for (Entity entity : getMembers()) { + nodeList += getHostAndPort(entity) + ","; + } + if (!nodeList.isEmpty()) { + for (Entity entity : getMembers()) { + String otherNodesList = Strings.removeFromEnd(nodeList.replace(getHostAndPort(entity) + ",", ""), ","); + if (!otherNodesList.isEmpty()) { + ((ElasticSearchNode)entity).resetCluster(otherNodesList); + } + } + + } + setAttribute(NODE_LIST, Strings.removeFromEnd(nodeList, ",")); + } + + private String getHostAndPort(Entity entity) { + return entity.getAttribute(Attributes.HOSTNAME) + ":" + entity.getAttribute(Attributes.HTTP_PORT); + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override protected void onEntityChange(Entity member) { + ((ElasticSearchClusterImpl)entity).resetCluster(); + } + @Override protected void onEntityAdded(Entity member) { + ((ElasticSearchClusterImpl)entity).resetCluster(); + } + @Override protected void onEntityRemoved(Entity member) { + ((ElasticSearchClusterImpl)entity).resetCluster(); + } + }; +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java index 1eb4fda..e9ec525 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNode.java @@ -27,14 +27,22 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD SoftwareProcess.DOWNLOAD_URL, "https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-${version}.tar.gz"); @SetFromFlag("dataDir") - ConfigKey<String> DATA_DIR = ConfigKeys.newStringConfigKey("elasticsearch.data.dir", "Directory for writing data files", null); + ConfigKey<String> DATA_DIR = ConfigKeys.newStringConfigKey("elasticsearch.node.data.dir", "Directory for writing data files", null); @SetFromFlag("logDir") - ConfigKey<String> LOG_DIR = ConfigKeys.newStringConfigKey("elasticsearch.log.dir", "Directory for writing log files", null); + ConfigKey<String> LOG_DIR = ConfigKeys.newStringConfigKey("elasticsearch.node.log.dir", "Directory for writing log files", null); @SetFromFlag("configFileUrl") ConfigKey<String> TEMPLATE_CONFIGURATION_URL = ConfigKeys.newStringConfigKey( - "elasticsearch.template.configuration.url", "URL where the elasticsearch configuration file (in freemarker format) can be found"); + "elasticsearch.node.template.configuration.url", "URL where the elasticsearch configuration file (in freemarker format) can be found", null); + + @SetFromFlag("multicastEnabled") + ConfigKey<Boolean> MULTICAST_ENABLED = ConfigKeys.newBooleanConfigKey("elasticsearch.node.multicast.enabled", + "Indicates whether zen discovery multicast should be enabled for a node", null); + + @SetFromFlag("multicastEnabled") + ConfigKey<Boolean> UNICAST_ENABLED = ConfigKeys.newBooleanConfigKey("elasticsearch.node.UNicast.enabled", + "Indicates whether zen discovery unicast should be enabled for a node", null); @SetFromFlag("httpPort") PortAttributeSensorAndConfigKey HTTP_PORT = new PortAttributeSensorAndConfigKey(WebAppServiceConstants.HTTP_PORT, PortRanges.fromString("9200+")); @@ -44,14 +52,16 @@ public interface ElasticSearchNode extends SoftwareProcess, DatastoreMixins.HasD "Node name (or randomly selected if not set", null); @SetFromFlag("clusterName") - StringAttributeSensorAndConfigKey CLUSTER_NAME = new StringAttributeSensorAndConfigKey("elasticsearch.cluster.name", + StringAttributeSensorAndConfigKey CLUSTER_NAME = new StringAttributeSensorAndConfigKey("elasticsearch.node.cluster.name", "Cluster name (or elasticsearch selected if not set", null); AttributeSensor<String> NODE_ID = Sensors.newStringSensor("elasticsearch.node.id"); - AttributeSensor<Integer> DOCUMENT_COUNT = Sensors.newIntegerSensor("elasticsearch.docs.count"); - AttributeSensor<Integer> STORE_BYTES = Sensors.newIntegerSensor("elasticsearch.store.bytes"); - AttributeSensor<Integer> GET_TOTAL = Sensors.newIntegerSensor("elasticsearch.get.total"); - AttributeSensor<Integer> GET_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.get.time.in.millis"); - AttributeSensor<Integer> SEARCH_QUERY_TOTAL = Sensors.newIntegerSensor("elasticsearch.search.query.total"); - AttributeSensor<Integer> SEARCH_QUERY_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.search.query.time.in.millis"); + AttributeSensor<Integer> DOCUMENT_COUNT = Sensors.newIntegerSensor("elasticsearch.node.docs.count"); + AttributeSensor<Integer> STORE_BYTES = Sensors.newIntegerSensor("elasticsearch.node.store.bytes"); + AttributeSensor<Integer> GET_TOTAL = Sensors.newIntegerSensor("elasticsearch.node.get.total"); + AttributeSensor<Integer> GET_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.node.get.time.in.millis"); + AttributeSensor<Integer> SEARCH_QUERY_TOTAL = Sensors.newIntegerSensor("elasticsearch.node.search.query.total"); + AttributeSensor<Integer> SEARCH_QUERY_TIME_IN_MILLIS = Sensors.newIntegerSensor("elasticsearch.node.search.query.time.in.millis"); + + void resetCluster(String nodeList); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java index 062e14b..fafe67f 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java @@ -1,16 +1,27 @@ package brooklyn.entity.nosql.elasticsearch; import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.http.client.HttpClient; +import org.bouncycastle.util.Strings; + +import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; import brooklyn.event.feed.http.HttpValueFunctions; import brooklyn.event.feed.http.JsonFunctions; import brooklyn.location.access.BrooklynAccessUtils; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.http.HttpTool; import brooklyn.util.http.HttpToolResponse; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; import com.google.gson.JsonElement; @@ -61,9 +72,6 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti .poll(new HttpPollConfig<String>(NODE_NAME) .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("name"), JsonFunctions.cast(String.class))) .onFailureOrException(Functions.<String>constant(null))) - .poll(new HttpPollConfig<String>(CLUSTER_NAME) - .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("settings", "cluster", "name"), JsonFunctions.cast(String.class))) - .onFailureOrException(Functions.<String>constant(null))) .poll(new HttpPollConfig<Integer>(DOCUMENT_COUNT) .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "docs", "count"), JsonFunctions.cast(Integer.class))) .onFailureOrException(Functions.<Integer>constant(null))) @@ -88,6 +96,21 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti } @Override + public void resetCluster(String nodeList) { + URI updateClusterUri; + try { + updateClusterUri = new URI(String.format("http://%s:%s/_cluster/settings", getAttribute(Attributes.HOSTNAME), getAttribute(HTTP_PORT))); + } catch (URISyntaxException e) { + throw Exceptions.propagate(e); + } + HttpClient client = HttpTool.httpClientBuilder().build(); + + String payload = String.format("{\"persistent\":{\"discovery.zen.ping.unicast.hosts\":\"%s\"}}", nodeList); + + HttpToolResponse result = HttpTool.httpPut(client, updateClusterUri, ImmutableMap.<String, String>of(), Strings.toByteArray(payload)); + } + + @Override protected void disconnectSensors() { if (httpFeed != null) { httpFeed.stop(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/587e6999/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java index 3a695cb..2dda090 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java @@ -33,6 +33,7 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver List<String> commands = ImmutableList.<String>builder() .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) + .add(BashCommands.installJavaLatestOrFail()) .add(String.format("tar zxvf %s", saveAs)) .build(); @@ -70,6 +71,8 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver appendConfigIfPresent(commandBuilder, ElasticSearchNode.LOG_DIR, "es.path.logs", Os.mergePaths(getRunDir(), "logs")); appendConfigIfPresent(commandBuilder, ElasticSearchNode.NODE_NAME.getConfigKey(), "es.node.name"); appendConfigIfPresent(commandBuilder, ElasticSearchNode.CLUSTER_NAME.getConfigKey(), "es.cluster.name"); + appendConfigIfPresent(commandBuilder, ElasticSearchNode.MULTICAST_ENABLED, "es.discovery.zen.ping.multicast.enabled"); + appendConfigIfPresent(commandBuilder, ElasticSearchNode.UNICAST_ENABLED, "es.discovery.zen.ping.unicast.enabled"); commandBuilder.append(" > out.log 2> err.log < /dev/null"); newScript(MutableMap.of("usePidFile", false), LAUNCHING) .updateTaskAndFailOnNonZeroResultCode() @@ -77,12 +80,15 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver .execute(); } - private void appendConfigIfPresent(StringBuilder builder, ConfigKey<String> configKey, String parameter) { + private void appendConfigIfPresent(StringBuilder builder, ConfigKey<?> configKey, String parameter) { appendConfigIfPresent(builder, configKey, parameter, null); } - private void appendConfigIfPresent(StringBuilder builder, ConfigKey<String> configKey, String parameter, String defaultValue) { - String config = entity.getConfig(configKey); + private void appendConfigIfPresent(StringBuilder builder, ConfigKey<?> configKey, String parameter, String defaultValue) { + String config = null; + if (entity.getConfig(configKey) != null) { + config = String.valueOf(entity.getConfig(configKey)); + } if (config == null && defaultValue != null) { config = defaultValue; }
