Updated following PR review
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/6572c027 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/6572c027 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/6572c027 Branch: refs/heads/master Commit: 6572c027b6e33dcded034113f19d9d4a28fe8fff Parents: 20507af Author: Martin Harris <[email protected]> Authored: Tue Jun 17 12:31:48 2014 +0100 Committer: Martin Harris <[email protected]> Committed: Tue Jun 17 12:31:48 2014 +0100 ---------------------------------------------------------------------- .../elasticsearch/ElasticSearchCluster.java | 4 +- .../elasticsearch/ElasticSearchClusterImpl.java | 15 +--- .../elasticsearch/ElasticSearchNodeImpl.java | 92 +++++++++----------- .../ElasticSearchNodeSshDriver.java | 2 +- 4 files changed, 44 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6572c027/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java index fea777e..a026d17 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchCluster.java @@ -2,9 +2,7 @@ package brooklyn.entity.nosql.elasticsearch; import brooklyn.entity.group.DynamicCluster; import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.AttributeSensor; import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.Sensors; import brooklyn.util.flags.SetFromFlag; /** @@ -16,5 +14,5 @@ public interface ElasticSearchCluster extends DynamicCluster { BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, "elasticsearch.cluster.name", "Name of the ElasticSearch cluster", "BrooklynCluster"); - String getName(); + String getClusterName(); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6572c027/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java index fe60c4a..32f8706 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchClusterImpl.java @@ -1,26 +1,15 @@ package brooklyn.entity.nosql.elasticsearch; -import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import brooklyn.entity.Entity; import brooklyn.entity.group.DynamicClusterImpl; import brooklyn.entity.proxying.EntitySpec; -import brooklyn.location.Location; public class ElasticSearchClusterImpl extends DynamicClusterImpl implements ElasticSearchCluster { private AtomicInteger nextMemberId = new AtomicInteger(0); - - public ElasticSearchClusterImpl() { - - } - - @Override - public void start(Collection<? extends Location> locations) { - super.start(locations); - } - + @Override protected boolean calculateServiceUp() { boolean up = false; @@ -42,7 +31,7 @@ public class ElasticSearchClusterImpl extends DynamicClusterImpl implements Elas } @Override - public String getName() { + public String getClusterName() { return getConfig(CLUSTER_NAME); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6572c027/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java index ec01539..e625aff 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeImpl.java @@ -1,64 +1,66 @@ package brooklyn.entity.nosql.elasticsearch; import static com.google.common.base.Preconditions.checkNotNull; - -import java.net.URI; -import java.net.URISyntaxException; - -import org.apache.http.client.HttpClient; -import org.bouncycastle.util.Strings; - -import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.event.AttributeSensor; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; import brooklyn.event.feed.http.HttpValueFunctions; import brooklyn.event.feed.http.JsonFunctions; import brooklyn.location.access.BrooklynAccessUtils; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.http.HttpTool; +import brooklyn.util.guava.Functionals; +import brooklyn.util.guava.Maybe; +import brooklyn.util.guava.MaybeFunctions; +import brooklyn.util.guava.TypeTokens; import brooklyn.util.http.HttpToolResponse; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; import com.google.gson.JsonElement; public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements ElasticSearchNode { - HttpFeed httpFeed; + protected static final Function<Maybe<JsonElement>, Maybe<JsonElement>> GET_FIRST_NODE_FROM_NODES = new Function<Maybe<JsonElement>, Maybe<JsonElement>>() { + @Override public Maybe<JsonElement> apply(Maybe<JsonElement> input) { + if (input.isAbsent()) { + return input; + } + return Maybe.fromNullable(input.get().getAsJsonObject().entrySet().iterator().next().getValue()); + } + }; - public ElasticSearchNodeImpl() { - - } + protected static final Function<HttpToolResponse, Maybe<JsonElement>> GET_FIRST_NODE = Functionals.chain(HttpValueFunctions.jsonContents(), + MaybeFunctions.<JsonElement>wrap(), JsonFunctions.walkM("nodes"), GET_FIRST_NODE_FROM_NODES); + + + HttpFeed httpFeed; @Override public Class<ElasticSearchNodeDriver> getDriverInterface() { return ElasticSearchNodeDriver.class; } + protected static final <T> HttpPollConfig<T> getSensorFromNodeStat(AttributeSensor<T> sensor, String... jsonPath) { + return new HttpPollConfig<T>(sensor) + .onSuccess(Functionals.chain(GET_FIRST_NODE, JsonFunctions.walkM(jsonPath), JsonFunctions.castM(TypeTokens.getRawRawType(sensor.getTypeToken()), null))) + .onFailureOrException(Functions.<T>constant(null)); + } + @Override protected void connectSensors() { super.connectSensors(); Integer rawPort = getAttribute(HTTP_PORT); checkNotNull(rawPort, "HTTP_PORT sensors not set for %s; is an acceptable port available?", this); HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, rawPort); - Function<JsonElement, String> getNodeId = new Function<JsonElement, String>() { - @Override public String apply(JsonElement input) { - return input.getAsJsonObject().entrySet().iterator().next().getKey(); - } - }; - - Function<JsonElement, JsonElement> getFirstNodeFromNodes = new Function<JsonElement, JsonElement>() { - @Override public JsonElement apply(JsonElement input) { - return input.getAsJsonObject().entrySet().iterator().next().getValue(); + Function<Maybe<JsonElement>, String> getNodeId = new Function<Maybe<JsonElement>, String>() { + @Override public String apply(Maybe<JsonElement> input) { + if (input.isAbsent()) { + return null; + } + return input.get().getAsJsonObject().entrySet().iterator().next().getKey(); } }; - - Function<HttpToolResponse, JsonElement> getFirstNode = HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), - JsonFunctions.walk("nodes"), getFirstNodeFromNodes); - httpFeed = HttpFeed.builder() .entity(this) .period(1000) @@ -66,30 +68,16 @@ public class ElasticSearchNodeImpl extends SoftwareProcessImpl implements Elasti .poll(new HttpPollConfig<Boolean>(SERVICE_UP) .onSuccess(HttpValueFunctions.responseCodeEquals(200)) .onFailureOrException(Functions.constant(false))) - .poll(new HttpPollConfig<String>(NODE_ID) - .onSuccess(HttpValueFunctions.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walk("nodes"), getNodeId)) - .onFailureOrException(Functions.constant(""))) - .poll(new HttpPollConfig<String>(NODE_NAME) - .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("name"), JsonFunctions.cast(String.class))) - .onFailureOrException(Functions.<String>constant(null))) - .poll(new HttpPollConfig<Integer>(DOCUMENT_COUNT) - .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "docs", "count"), JsonFunctions.cast(Integer.class))) - .onFailureOrException(Functions.<Integer>constant(null))) - .poll(new HttpPollConfig<Integer>(STORE_BYTES) - .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "store", "size_in_bytes"), JsonFunctions.cast(Integer.class))) - .onFailureOrException(Functions.<Integer>constant(null))) - .poll(new HttpPollConfig<Integer>(GET_TOTAL) - .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "get", "total"), JsonFunctions.cast(Integer.class))) - .onFailureOrException(Functions.<Integer>constant(null))) - .poll(new HttpPollConfig<Integer>(GET_TIME_IN_MILLIS) - .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "get", "time_in_millis"), JsonFunctions.cast(Integer.class))) - .onFailureOrException(Functions.<Integer>constant(null))) - .poll(new HttpPollConfig<Integer>(SEARCH_QUERY_TOTAL) - .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "search", "query_total"), JsonFunctions.cast(Integer.class))) - .onFailureOrException(Functions.<Integer>constant(null))) - .poll(new HttpPollConfig<Integer>(SEARCH_QUERY_TIME_IN_MILLIS) - .onSuccess(HttpValueFunctions.chain(getFirstNode, JsonFunctions.walk("indices", "search", "query_time_in_millis"), JsonFunctions.cast(Integer.class))) - .onFailureOrException(Functions.<Integer>constant(null))) + .poll(new HttpPollConfig<String>(NODE_ID) + .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), MaybeFunctions.<JsonElement>wrap(), JsonFunctions.walkM("nodes"), getNodeId)) + .onFailureOrException(Functions.constant(""))) + .poll(getSensorFromNodeStat(NODE_NAME, "name")) + .poll(getSensorFromNodeStat(DOCUMENT_COUNT, "indices", "docs", "count")) + .poll(getSensorFromNodeStat(STORE_BYTES, "indices", "store", "size_in_bytes")) + .poll(getSensorFromNodeStat(GET_TOTAL, "indices", "get", "total")) + .poll(getSensorFromNodeStat(GET_TIME_IN_MILLIS, "indices", "get", "time_in_millis")) + .poll(getSensorFromNodeStat(SEARCH_QUERY_TOTAL, "indices", "search", "query_total")) + .poll(getSensorFromNodeStat(SEARCH_QUERY_TIME_IN_MILLIS, "indices", "search", "query_time_in_millis")) .poll(new HttpPollConfig<String>(CLUSTER_NAME) .onSuccess(HttpValueFunctions.jsonContents("cluster_name", String.class))) .build(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6572c027/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java index 2dda090..748975b 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/elasticsearch/ElasticSearchNodeSshDriver.java @@ -32,8 +32,8 @@ public class ElasticSearchNodeSshDriver extends AbstractSoftwareProcessSshDriver String saveAs = resolver.getFilename(); List<String> commands = ImmutableList.<String>builder() - .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) .add(BashCommands.installJavaLatestOrFail()) + .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) .add(String.format("tar zxvf %s", saveAs)) .build();
