http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java new file mode 100644 index 0000000..e8cfc9f --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.http; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.URI; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.feed.AbstractFeed; +import org.apache.brooklyn.core.feed.AttributePollHandler; +import org.apache.brooklyn.core.feed.DelegatingPollHandler; +import org.apache.brooklyn.core.feed.Poller; +import org.apache.brooklyn.util.core.http.HttpTool; +import org.apache.brooklyn.util.core.http.HttpToolResponse; +import org.apache.brooklyn.util.core.http.HttpTool.HttpClientBuilder; +import org.apache.brooklyn.util.time.Duration; +import org.apache.http.auth.Credentials; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.HttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; + +/** + * Provides a feed of attribute values, by polling over http. + * + * Example usage (e.g. in an entity that extends SoftwareProcessImpl): + * <pre> + * {@code + * private HttpFeed feed; + * + * //@Override + * protected void connectSensors() { + * super.connectSensors(); + * + * feed = HttpFeed.builder() + * .entity(this) + * .period(200) + * .baseUri(String.format("http://%s:%s/management/subsystem/web/connector/http/read-resource", host, port)) + * .baseUriVars(ImmutableMap.of("include-runtime","true")) + * .poll(new HttpPollConfig<Boolean>(SERVICE_UP) + * .onSuccess(HttpValueFunctions.responseCodeEquals(200)) + * .onError(Functions.constant(false))) + * .poll(new HttpPollConfig<Integer>(REQUEST_COUNT) + * .onSuccess(HttpValueFunctions.jsonContents("requestCount", Integer.class))) + * .build(); + * } + * + * {@literal @}Override + * protected void disconnectSensors() { + * super.disconnectSensors(); + * if (feed != null) feed.stop(); + * } + * } + * </pre> + * <p> + * + * This also supports giving a Supplier for the URL + * (e.g. {@link Entities#attributeSupplier(org.apache.brooklyn.api.entity.Entity, org.apache.brooklyn.api.event.AttributeSensor)}) + * from a sensor. Note however that if a Supplier-based sensor is *https*, + * https-specific initialization may not occur if the URL is not available at start time, + * and it may report errors if that sensor is not available. + * Some guidance for controlling enablement of a feed based on availability of a sensor + * can be seen in HttpLatencyDetector (in brooklyn-policy). + * + * @author aled + */ +public class HttpFeed extends AbstractFeed { + + public static final Logger log = LoggerFactory.getLogger(HttpFeed.class); + + @SuppressWarnings("serial") + public static final ConfigKey<SetMultimap<HttpPollIdentifier, HttpPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( + new TypeToken<SetMultimap<HttpPollIdentifier, HttpPollConfig<?>>>() {}, + "polls"); + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private EntityLocal entity; + private boolean onlyIfServiceUp = false; + private Supplier<URI> baseUriProvider; + private Duration period = Duration.millis(500); + private List<HttpPollConfig<?>> polls = Lists.newArrayList(); + private URI baseUri; + private Map<String, String> baseUriVars = Maps.newLinkedHashMap(); + private Map<String, String> headers = Maps.newLinkedHashMap(); + private boolean suspended = false; + private Credentials credentials; + private String uniqueTag; + private volatile boolean built; + + public Builder entity(EntityLocal val) { + this.entity = val; + return this; + } + public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } + public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { + this.onlyIfServiceUp = onlyIfServiceUp; + return this; + } + public Builder baseUri(Supplier<URI> val) { + if (baseUri!=null && val!=null) + throw new IllegalStateException("Builder cannot take both a URI and a URI Provider"); + this.baseUriProvider = val; + return this; + } + public Builder baseUri(URI val) { + if (baseUriProvider!=null && val!=null) + throw new IllegalStateException("Builder cannot take both a URI and a URI Provider"); + this.baseUri = val; + return this; + } + public Builder baseUrl(URL val) { + return baseUri(URI.create(val.toString())); + } + public Builder baseUri(String val) { + return baseUri(URI.create(val)); + } + public Builder baseUriVars(Map<String,String> vals) { + baseUriVars.putAll(vals); + return this; + } + public Builder baseUriVar(String key, String val) { + baseUriVars.put(key, val); + return this; + } + public Builder headers(Map<String,String> vals) { + headers.putAll(vals); + return this; + } + public Builder header(String key, String val) { + headers.put(key, val); + return this; + } + public Builder period(Duration duration) { + this.period = duration; + return this; + } + public Builder period(long millis) { + return period(millis, TimeUnit.MILLISECONDS); + } + public Builder period(long val, TimeUnit units) { + return period(Duration.of(val, units)); + } + public Builder poll(HttpPollConfig<?> config) { + polls.add(config); + return this; + } + public Builder suspended() { + return suspended(true); + } + public Builder suspended(boolean startsSuspended) { + this.suspended = startsSuspended; + return this; + } + public Builder credentials(String username, String password) { + this.credentials = new UsernamePasswordCredentials(username, password); + return this; + } + public Builder credentialsIfNotNull(String username, String password) { + if (username != null && password != null) { + this.credentials = new UsernamePasswordCredentials(username, password); + } + return this; + } + public Builder uniqueTag(String uniqueTag) { + this.uniqueTag = uniqueTag; + return this; + } + public HttpFeed build() { + built = true; + HttpFeed result = new HttpFeed(this); + result.setEntity(checkNotNull(entity, "entity")); + if (suspended) result.suspend(); + result.start(); + return result; + } + @Override + protected void finalize() { + if (!built) log.warn("HttpFeed.Builder created, but build() never called"); + } + } + + private static class HttpPollIdentifier { + final String method; + final Supplier<URI> uriProvider; + final Map<String,String> headers; + final byte[] body; + final Optional<Credentials> credentials; + final Duration connectionTimeout; + final Duration socketTimeout; + private HttpPollIdentifier(String method, Supplier<URI> uriProvider, Map<String, String> headers, byte[] body, + Optional<Credentials> credentials, Duration connectionTimeout, Duration socketTimeout) { + this.method = checkNotNull(method, "method").toLowerCase(); + this.uriProvider = checkNotNull(uriProvider, "uriProvider"); + this.headers = checkNotNull(headers, "headers"); + this.body = body; + this.credentials = checkNotNull(credentials, "credentials"); + this.connectionTimeout = connectionTimeout; + this.socketTimeout = socketTimeout; + + if (!(this.method.equals("get") || this.method.equals("post"))) { + throw new IllegalArgumentException("Unsupported HTTP method (only supports GET and POST): "+method); + } + if (body != null && method.equalsIgnoreCase("get")) { + throw new IllegalArgumentException("Must not set body for http GET method"); + } + } + + @Override + public int hashCode() { + return Objects.hashCode(method, uriProvider, headers, body, credentials); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof HttpPollIdentifier)) { + return false; + } + HttpPollIdentifier o = (HttpPollIdentifier) other; + return Objects.equal(method, o.method) && + Objects.equal(uriProvider, o.uriProvider) && + Objects.equal(headers, o.headers) && + Objects.equal(body, o.body) && + Objects.equal(credentials, o.credentials); + } + } + + /** + * For rebind; do not call directly; use builder + */ + public HttpFeed() { + } + + protected HttpFeed(Builder builder) { + setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp); + Map<String,String> baseHeaders = ImmutableMap.copyOf(checkNotNull(builder.headers, "headers")); + + SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = HashMultimap.<HttpPollIdentifier,HttpPollConfig<?>>create(); + for (HttpPollConfig<?> config : builder.polls) { + if (!config.isEnabled()) continue; + @SuppressWarnings({ "unchecked", "rawtypes" }) + HttpPollConfig<?> configCopy = new HttpPollConfig(config); + if (configCopy.getPeriod() < 0) configCopy.period(builder.period); + String method = config.getMethod(); + Map<String,String> headers = config.buildHeaders(baseHeaders); + byte[] body = config.getBody(); + Duration connectionTimeout = config.getConnectionTimeout(); + Duration socketTimeout = config.getSocketTimeout(); + + Optional<Credentials> credentials = Optional.fromNullable(builder.credentials); + + Supplier<URI> baseUriProvider = builder.baseUriProvider; + if (builder.baseUri!=null) { + if (baseUriProvider!=null) + throw new IllegalStateException("Not permitted to supply baseUri and baseUriProvider"); + Map<String,String> baseUriVars = ImmutableMap.copyOf(checkNotNull(builder.baseUriVars, "baseUriVars")); + URI uri = config.buildUri(builder.baseUri, baseUriVars); + baseUriProvider = Suppliers.ofInstance(uri); + } else if (!builder.baseUriVars.isEmpty()) { + throw new IllegalStateException("Not permitted to supply URI vars when using a URI provider; pass the vars to the provider instead"); + } + checkNotNull(baseUriProvider); + + polls.put(new HttpPollIdentifier(method, baseUriProvider, headers, body, credentials, connectionTimeout, socketTimeout), configCopy); + } + setConfig(POLLS, polls); + initUniqueTag(builder.uniqueTag, polls.values()); + } + + @Override + protected void preStart() { + SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = getConfig(POLLS); + + for (final HttpPollIdentifier pollInfo : polls.keySet()) { + // Though HttpClients are thread safe and can take advantage of connection pooling + // and authentication caching, the httpcomponents documentation says: + // "While HttpClient instances are thread safe and can be shared between multiple + // threads of execution, it is highly recommended that each thread maintains its + // own dedicated instance of HttpContext. + // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html + final HttpClient httpClient = createHttpClient(pollInfo); + + Set<HttpPollConfig<?>> configs = polls.get(pollInfo); + long minPeriod = Integer.MAX_VALUE; + Set<AttributePollHandler<? super HttpToolResponse>> handlers = Sets.newLinkedHashSet(); + + for (HttpPollConfig<?> config : configs) { + handlers.add(new AttributePollHandler<HttpToolResponse>(config, entity, this)); + if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); + } + + Callable<HttpToolResponse> pollJob; + + if (pollInfo.method.equals("get")) { + pollJob = new Callable<HttpToolResponse>() { + public HttpToolResponse call() throws Exception { + if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo); + return HttpTool.httpGet(httpClient, pollInfo.uriProvider.get(), pollInfo.headers); + }}; + } else if (pollInfo.method.equals("post")) { + pollJob = new Callable<HttpToolResponse>() { + public HttpToolResponse call() throws Exception { + if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo); + return HttpTool.httpPost(httpClient, pollInfo.uriProvider.get(), pollInfo.headers, pollInfo.body); + }}; + } else if (pollInfo.method.equals("head")) { + pollJob = new Callable<HttpToolResponse>() { + public HttpToolResponse call() throws Exception { + if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo); + return HttpTool.httpHead(httpClient, pollInfo.uriProvider.get(), pollInfo.headers); + }}; + } else { + throw new IllegalStateException("Unexpected http method: "+pollInfo.method); + } + + getPoller().scheduleAtFixedRate(pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod); + } + } + + // TODO Should we really trustAll for https? Make configurable? + private HttpClient createHttpClient(HttpPollIdentifier pollIdentifier) { + URI uri = pollIdentifier.uriProvider.get(); + HttpClientBuilder builder = HttpTool.httpClientBuilder() + .trustAll() + .laxRedirect(true); + if (uri != null) builder.uri(uri); + if (uri != null) builder.credential(pollIdentifier.credentials); + if (pollIdentifier.connectionTimeout != null) { + builder.connectionTimeout(pollIdentifier.connectionTimeout); + } + if (pollIdentifier.socketTimeout != null) { + builder.socketTimeout(pollIdentifier.socketTimeout); + } + return builder.build(); + } + + @SuppressWarnings("unchecked") + protected Poller<HttpToolResponse> getPoller() { + return (Poller<HttpToolResponse>) super.getPoller(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java new file mode 100644 index 0000000..e019293 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollConfig.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.http; + +import java.net.URI; +import java.util.Map; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.feed.PollConfig; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.http.HttpTool; +import org.apache.brooklyn.util.core.http.HttpToolResponse; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; + +public class HttpPollConfig<T> extends PollConfig<HttpToolResponse, T, HttpPollConfig<T>> { + + private String method = "GET"; + private String suburl = ""; + private Map<String, String> vars = ImmutableMap.<String,String>of(); + private Map<String, String> headers = ImmutableMap.<String,String>of(); + private byte[] body; + private Duration connectionTimeout; + private Duration socketTimeout; + + public static final Predicate<HttpToolResponse> DEFAULT_SUCCESS = new Predicate<HttpToolResponse>() { + @Override + public boolean apply(@Nullable HttpToolResponse input) { + return input != null && input.getResponseCode() >= 200 && input.getResponseCode() <= 399; + }}; + + public static <T> HttpPollConfig<T> forSensor(AttributeSensor<T> sensor) { + return new HttpPollConfig<T>(sensor); + } + + public static HttpPollConfig<Void> forMultiple() { + return new HttpPollConfig<Void>(PollConfig.NO_SENSOR); + } + + public HttpPollConfig(AttributeSensor<T> sensor) { + super(sensor); + super.checkSuccess(DEFAULT_SUCCESS); + } + + public HttpPollConfig(HttpPollConfig<T> other) { + super(other); + suburl = other.suburl; + vars = other.vars; + method = other.method; + headers = other.headers; + } + + public String getSuburl() { + return suburl; + } + + public Map<String, String> getVars() { + return vars; + } + + public Duration getConnectionTimeout() { + return connectionTimeout; + } + + public Duration getSocketTimeout() { + return socketTimeout; + } + + public String getMethod() { + return method; + } + + public byte[] getBody() { + return body; + } + + public HttpPollConfig<T> method(String val) { + this.method = val; return this; + } + + public HttpPollConfig<T> suburl(String val) { + this.suburl = val; return this; + } + + public HttpPollConfig<T> vars(Map<String,String> val) { + this.vars = val; return this; + } + + public HttpPollConfig<T> headers(Map<String,String> val) { + this.headers = val; return this; + } + + public HttpPollConfig<T> body(byte[] val) { + this.body = val; return this; + } + public HttpPollConfig<T> connectionTimeout(Duration val) { + this.connectionTimeout = val; + return this; + } + public HttpPollConfig<T> socketTimeout(Duration val) { + this.socketTimeout = val; + return this; + } + public URI buildUri(URI baseUri, Map<String,String> baseUriVars) { + String uri = (baseUri != null ? baseUri.toString() : "") + (suburl != null ? suburl : ""); + Map<String,String> allvars = concat(baseUriVars, vars); + + if (allvars != null && allvars.size() > 0) { + uri += "?" + HttpTool.encodeUrlParams(allvars); + } + + return URI.create(uri); + } + + public Map<String, String> buildHeaders(Map<String, String> baseHeaders) { + return MutableMap.<String,String>builder() + .putAll(baseHeaders) + .putAll(headers) + .build(); + } + + @SuppressWarnings("unchecked") + private <K,V> Map<K,V> concat(Map<? extends K,? extends V> map1, Map<? extends K,? extends V> map2) { + if (map1 == null || map1.isEmpty()) return (Map<K,V>) map2; + if (map2 == null || map2.isEmpty()) return (Map<K,V>) map1; + + // TODO Not using Immutable builder, because that fails if duplicates in map1 and map2 + return MutableMap.<K,V>builder().putAll(map1).putAll(map2).build(); + } + + @Override protected String toStringBaseName() { return "http"; } + @Override protected String toStringPollSource() { return suburl; } + @Override + protected MutableList<Object> equalsFields() { + return super.equalsFields().appendIfNotNull(method).appendIfNotNull(vars).appendIfNotNull(headers) + .appendIfNotNull(body).appendIfNotNull(connectionTimeout).appendIfNotNull(socketTimeout); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java new file mode 100644 index 0000000..5414cc4 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPollValue.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.http; + +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.util.core.http.HttpToolResponse; + +/** @deprecated since 0.7.0, use {@link HttpToolResponse}. + * the old {@link HttpPollValue} concrete class has been renamed {@link HttpToolResponse} + * because it has nothing specific to polls. this is now just a transitional interface. */ +@Deprecated +public interface HttpPollValue { + + public int getResponseCode(); + public String getReasonPhrase(); + public long getStartTime(); + public long getLatencyFullContent(); + public long getLatencyFirstResponse(); + public Map<String, List<String>> getHeaderLists(); + public byte[] getContent(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java new file mode 100644 index 0000000..10c3b3e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpPolls.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.http; + +import java.net.URI; + +import org.apache.brooklyn.util.core.http.HttpTool; +import org.apache.brooklyn.util.core.http.HttpToolResponse; +import org.apache.http.impl.client.DefaultHttpClient; + +import com.google.common.collect.ImmutableMap; + +/** + * @deprecated since 0.7; use {@link HttpTool} + */ +@Deprecated +public class HttpPolls { + + public static HttpToolResponse executeSimpleGet(URI uri) { + return HttpTool.httpGet(new DefaultHttpClient(), uri, ImmutableMap.<String,String>of()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java new file mode 100644 index 0000000..75dab74 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpValueFunctions.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.http; + +import java.util.List; + +import org.apache.brooklyn.util.core.http.HttpToolResponse; +import org.apache.brooklyn.util.guava.Functionals; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Predicates; +import com.google.common.collect.Lists; +import com.google.gson.JsonElement; + +public class HttpValueFunctions { + + private HttpValueFunctions() {} // instead use static utility methods + + public static Function<HttpToolResponse, Integer> responseCode() { + return new ResponseCode(); + } + + /** @deprecated since 0.7.0; only here for deserialization of persisted state */ + private static Function<HttpToolResponse, Integer> responseCodeLegacy() { + return new Function<HttpToolResponse, Integer>() { + @Override public Integer apply(HttpToolResponse input) { + return input.getResponseCode(); + } + }; + } + + private static class ResponseCode implements Function<HttpToolResponse, Integer> { + @Override public Integer apply(HttpToolResponse input) { + return input.getResponseCode(); + } + } + + public static Function<HttpToolResponse, Boolean> responseCodeEquals(final int expected) { + return Functionals.chain(HttpValueFunctions.responseCode(), Functions.forPredicate(Predicates.equalTo(expected))); + } + + public static Function<HttpToolResponse, Boolean> responseCodeEquals(final int... expected) { + List<Integer> expectedList = Lists.newArrayList(); + for (int e : expected) { + expectedList.add((Integer)e); + } + return Functionals.chain(HttpValueFunctions.responseCode(), Functions.forPredicate(Predicates.in(expectedList))); + } + + public static Function<HttpToolResponse, String> stringContentsFunction() { + return new StringContents(); + } + + /** @deprecated since 0.7.0; only here for deserialization of persisted state */ + private static Function<HttpToolResponse, String> stringContentsFunctionLegacy() { + return new Function<HttpToolResponse, String>() { + @Override public String apply(HttpToolResponse input) { + return input.getContentAsString(); + } + }; + } + + private static class StringContents implements Function<HttpToolResponse, String> { + @Override public String apply(HttpToolResponse input) { + return input.getContentAsString(); + } + } + + public static Function<HttpToolResponse, JsonElement> jsonContents() { + return Functionals.chain(stringContentsFunction(), JsonFunctions.asJson()); + } + + public static <T> Function<HttpToolResponse, T> jsonContents(String element, Class<T> expected) { + return jsonContents(new String[] {element}, expected); + } + + public static <T> Function<HttpToolResponse, T> jsonContents(String[] elements, Class<T> expected) { + return Functionals.chain(jsonContents(), JsonFunctions.walk(elements), JsonFunctions.cast(expected)); + } + + public static <T> Function<HttpToolResponse, T> jsonContentsFromPath(String path){ + return Functionals.chain(jsonContents(), JsonFunctions.<T>getPath(path)); + } + + public static Function<HttpToolResponse, Long> latency() { + return new Latency(); + } + + /** @deprecated since 0.7.0; only here for deserialization of persisted state */ + private static Function<HttpToolResponse, Long> latencyLegacy() { + return new Function<HttpToolResponse, Long>() { + public Long apply(HttpToolResponse input) { + return input.getLatencyFullContent(); + } + }; + } + + private static class Latency implements Function<HttpToolResponse, Long> { + public Long apply(HttpToolResponse input) { + return input.getLatencyFullContent(); + } + }; + + public static Function<HttpToolResponse, Boolean> containsHeader(String header) { + return new ContainsHeader(header); + } + + private static class ContainsHeader implements Function<HttpToolResponse, Boolean> { + private final String header; + + public ContainsHeader(String header) { + this.header = header; + } + @Override + public Boolean apply(HttpToolResponse input) { + List<String> actual = input.getHeaderLists().get(header); + return actual != null && actual.size() > 0; + } + } + + + /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function)} */ @Deprecated + public static <A,B,C> Function<A,C> chain(final Function<A,? extends B> f1, final Function<B,C> f2) { + return Functionals.chain(f1, f2); + } + + /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function, Function)} */ @Deprecated + public static <A,B,C,D> Function<A,D> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,D> f3) { + return Functionals.chain(f1, f2, f3); + } + + /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function, Function, Function)} */ @Deprecated + public static <A,B,C,D,E> Function<A,E> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,? extends D> f3, final Function<D,E> f4) { + return Functionals.chain(f1, f2, f3, f4); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java new file mode 100644 index 0000000..a3e04cd --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/http/JsonFunctions.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.http; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.util.guava.Functionals; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.guava.MaybeFunctions; + +import com.google.common.base.Function; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.gson.*; +import com.jayway.jsonpath.JsonPath; + +public class JsonFunctions { + + private JsonFunctions() {} // instead use static utility methods + + public static Function<String, JsonElement> asJson() { + return new Function<String, JsonElement>() { + @Override public JsonElement apply(String input) { + return new JsonParser().parse(input); + } + }; + } + + public static <T> Function<JsonElement, List<T>> forEach(final Function<JsonElement, T> func) { + return new Function<JsonElement, List<T>>() { + @Override public List<T> apply(JsonElement input) { + JsonArray array = (JsonArray) input; + List<T> result = Lists.newArrayList(); + for (int i = 0; i < array.size(); i++) { + result.add(func.apply(array.get(i))); + } + return result; + } + }; + } + + + /** as {@link #walkM(Iterable)} taking a single string consisting of a dot separated path */ + public static Function<JsonElement, JsonElement> walk(String elementOrDotSeparatedElements) { + return walk( Splitter.on('.').split(elementOrDotSeparatedElements) ); + } + + /** as {@link #walkM(Iterable)} taking a series of strings (dot separators not respected here) */ + public static Function<JsonElement, JsonElement> walk(final String... elements) { + return walk(Arrays.asList(elements)); + } + + /** returns a function which traverses the supplied path of entries in a json object (maps of maps of maps...), + * @throws NoSuchElementException if any path is not present as a key in that map */ + public static Function<JsonElement, JsonElement> walk(final Iterable<String> elements) { + // could do this instead, pointing at Maybe for this, and for walkN, but it's slightly less efficient +// return Functionals.chain(MaybeFunctions.<JsonElement>wrap(), walkM(elements), MaybeFunctions.<JsonElement>get()); + + return new Function<JsonElement, JsonElement>() { + @Override public JsonElement apply(JsonElement input) { + JsonElement curr = input; + for (String element : elements) { + JsonObject jo = curr.getAsJsonObject(); + curr = jo.get(element); + if (curr==null) + throw new NoSuchElementException("No element '"+element+" in JSON, when walking "+elements); + } + return curr; + } + }; + } + + + /** as {@link #walk(String)} but if any element is not found it simply returns null */ + public static Function<JsonElement, JsonElement> walkN(@Nullable String elements) { + return walkN( Splitter.on('.').split(elements) ); + } + + /** as {@link #walk(String...))} but if any element is not found it simply returns null */ + public static Function<JsonElement, JsonElement> walkN(final String... elements) { + return walkN(Arrays.asList(elements)); + } + + /** as {@link #walk(Iterable))} but if any element is not found it simply returns null */ + public static Function<JsonElement, JsonElement> walkN(final Iterable<String> elements) { + return new Function<JsonElement, JsonElement>() { + @Override public JsonElement apply(JsonElement input) { + JsonElement curr = input; + for (String element : elements) { + if (curr==null) return null; + JsonObject jo = curr.getAsJsonObject(); + curr = jo.get(element); + } + return curr; + } + }; + } + + /** as {@link #walk(String))} and {@link #walk(Iterable)} */ + public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(@Nullable String elements) { + return walkM( Splitter.on('.').split(elements) ); + } + + /** as {@link #walk(String...))} and {@link #walk(Iterable)} */ + public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(final String... elements) { + return walkM(Arrays.asList(elements)); + } + + /** as {@link #walk(Iterable))} but working with objects which {@link Maybe} contain {@link JsonElement}, + * simply preserving a {@link Maybe#absent()} object if additional walks are requested upon it + * (cf jquery) */ + public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(final Iterable<String> elements) { + return new Function<Maybe<JsonElement>, Maybe<JsonElement>>() { + @Override public Maybe<JsonElement> apply(Maybe<JsonElement> input) { + Maybe<JsonElement> curr = input; + for (String element : elements) { + if (curr.isAbsent()) return curr; + JsonObject jo = curr.get().getAsJsonObject(); + JsonElement currO = jo.get(element); + if (currO==null) return Maybe.absent("No element '"+element+" in JSON, when walking "+elements); + curr = Maybe.of(currO); + } + return curr; + } + }; + } + + /** + * returns an element from a single json primitive value given a full path {@link com.jayway.jsonpath.JsonPath} + */ + public static <T> Function<JsonElement,T> getPath(final String path) { + return new Function<JsonElement, T>() { + @SuppressWarnings("unchecked") + @Override public T apply(JsonElement input) { + String jsonString = input.toString(); + Object rawElement = JsonPath.read(jsonString, path); + return (T) rawElement; + } + }; + } + + @SuppressWarnings("unchecked") + public static <T> Function<JsonElement, T> cast(final Class<T> expected) { + return new Function<JsonElement, T>() { + @Override public T apply(JsonElement input) { + if (input == null) { + return (T) null; + } else if (input.isJsonNull()) { + return (T) null; + } else if (expected == boolean.class || expected == Boolean.class) { + return (T) (Boolean) input.getAsBoolean(); + } else if (expected == char.class || expected == Character.class) { + return (T) (Character) input.getAsCharacter(); + } else if (expected == byte.class || expected == Byte.class) { + return (T) (Byte) input.getAsByte(); + } else if (expected == short.class || expected == Short.class) { + return (T) (Short) input.getAsShort(); + } else if (expected == int.class || expected == Integer.class) { + return (T) (Integer) input.getAsInt(); + } else if (expected == long.class || expected == Long.class) { + return (T) (Long) input.getAsLong(); + } else if (expected == float.class || expected == Float.class) { + return (T) (Float) input.getAsFloat(); + } else if (expected == double.class || expected == Double.class) { + return (T) (Double) input.getAsDouble(); + } else if (expected == BigDecimal.class) { + return (T) input.getAsBigDecimal(); + } else if (expected == BigInteger.class) { + return (T) input.getAsBigInteger(); + } else if (Number.class.isAssignableFrom(expected)) { + // TODO Will result in a class-cast if it's an unexpected sub-type of Number not handled above + return (T) input.getAsNumber(); + } else if (expected == String.class) { + return (T) input.getAsString(); + } else if (expected.isArray()) { + JsonArray array = input.getAsJsonArray(); + Class<?> componentType = expected.getComponentType(); + if (JsonElement.class.isAssignableFrom(componentType)) { + JsonElement[] result = new JsonElement[array.size()]; + for (int i = 0; i < array.size(); i++) { + result[i] = array.get(i); + } + return (T) result; + } else { + Object[] result = (Object[]) Array.newInstance(componentType, array.size()); + for (int i = 0; i < array.size(); i++) { + result[i] = cast(componentType).apply(array.get(i)); + } + return (T) result; + } + } else { + throw new IllegalArgumentException("Cannot cast json element to type "+expected); + } + } + }; + } + + public static <T> Function<Maybe<JsonElement>, T> castM(final Class<T> expected) { + return Functionals.chain(MaybeFunctions.<JsonElement>get(), cast(expected)); + } + + public static <T> Function<Maybe<JsonElement>, T> castM(final Class<T> expected, final T defaultValue) { + return new Function<Maybe<JsonElement>, T>() { + @Override + public T apply(Maybe<JsonElement> input) { + if (input.isAbsent()) return defaultValue; + return cast(expected).apply(input.get()); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java new file mode 100644 index 0000000..caeb21a --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.shell; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.ExecutionContext; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.feed.AbstractFeed; +import org.apache.brooklyn.core.feed.AttributePollHandler; +import org.apache.brooklyn.core.feed.DelegatingPollHandler; +import org.apache.brooklyn.core.feed.Poller; +import org.apache.brooklyn.feed.function.FunctionFeed; +import org.apache.brooklyn.feed.ssh.SshFeed; +import org.apache.brooklyn.feed.ssh.SshPollValue; +import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory; +import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.util.core.task.system.internal.SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; + +/** + * Provides a feed of attribute values, by executing shell commands (on the local machine where + * this instance of brooklyn is running). Useful e.g. for paas tools such as Cloud Foundry vmc + * which operate against a remote target. + * + * Example usage (e.g. in an entity that extends SoftwareProcessImpl): + * <pre> + * {@code + * private ShellFeed feed; + * + * //@Override + * protected void connectSensors() { + * super.connectSensors(); + * + * feed = ShellFeed.builder() + * .entity(this) + * .machine(mySshMachineLachine) + * .poll(new ShellPollConfig<Long>(DISK_USAGE) + * .command("df -P | grep /dev") + * .failOnNonZeroResultCode(true) + * .onSuccess(new Function<SshPollValue, Long>() { + * public Long apply(SshPollValue input) { + * String[] parts = input.getStdout().split("[ \\t]+"); + * return Long.parseLong(parts[2]); + * }})) + * .build(); + * } + * + * {@literal @}Override + * protected void disconnectSensors() { + * super.disconnectSensors(); + * if (feed != null) feed.stop(); + * } + * } + * </pre> + * + * @see SshFeed (to run on remote machines) + * @see FunctionFeed (for arbitrary functions) + * + * @author aled + */ +public class ShellFeed extends AbstractFeed { + + public static final Logger log = LoggerFactory.getLogger(ShellFeed.class); + + @SuppressWarnings("serial") + private static final ConfigKey<SetMultimap<ShellPollIdentifier, ShellPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( + new TypeToken<SetMultimap<ShellPollIdentifier, ShellPollConfig<?>>>() {}, + "polls"); + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private EntityLocal entity; + private long period = 500; + private TimeUnit periodUnits = TimeUnit.MILLISECONDS; + private List<ShellPollConfig<?>> polls = Lists.newArrayList(); + private String uniqueTag; + private volatile boolean built; + + public Builder entity(EntityLocal val) { + this.entity = val; + return this; + } + public Builder period(long millis) { + return period(millis, TimeUnit.MILLISECONDS); + } + public Builder period(long val, TimeUnit units) { + this.period = val; + this.periodUnits = units; + return this; + } + public Builder poll(ShellPollConfig<?> config) { + polls.add(config); + return this; + } + public Builder uniqueTag(String uniqueTag) { + this.uniqueTag = uniqueTag; + return this; + } + public ShellFeed build() { + built = true; + ShellFeed result = new ShellFeed(this); + result.setEntity(checkNotNull(entity, "entity")); + result.start(); + return result; + } + @Override + protected void finalize() { + if (!built) log.warn("ShellFeed.Builder created, but build() never called"); + } + } + + private static class ShellPollIdentifier { + final String command; + final Map<String, String> env; + final File dir; + final String input; + final String context; + final long timeout; + + private ShellPollIdentifier(String command, Map<String, String> env, File dir, String input, String context, long timeout) { + this.command = checkNotNull(command, "command"); + this.env = checkNotNull(env, "env"); + this.dir = dir; + this.input = input; + this.context = checkNotNull(context, "context"); + this.timeout = timeout; + } + + @Override + public int hashCode() { + return Objects.hashCode(command, env, dir, input, timeout); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ShellPollIdentifier)) { + return false; + } + ShellPollIdentifier o = (ShellPollIdentifier) other; + return Objects.equal(command, o.command) && + Objects.equal(env, o.env) && + Objects.equal(dir, o.dir) && + Objects.equal(input, o.input) && + Objects.equal(timeout, o.timeout); + } + } + + /** + * For rebind; do not call directly; use builder + */ + public ShellFeed() { + } + + protected ShellFeed(Builder builder) { + super(); + + SetMultimap<ShellPollIdentifier, ShellPollConfig<?>> polls = HashMultimap.<ShellPollIdentifier,ShellPollConfig<?>>create(); + for (ShellPollConfig<?> config : builder.polls) { + if (!config.isEnabled()) continue; + @SuppressWarnings({ "unchecked", "rawtypes" }) + ShellPollConfig<?> configCopy = new ShellPollConfig(config); + if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); + String command = config.getCommand(); + Map<String, String> env = config.getEnv(); + File dir = config.getDir(); + String input = config.getInput(); + String context = config.getSensor().getName(); + long timeout = config.getTimeout(); + + polls.put(new ShellPollIdentifier(command, env, dir, input, context, timeout), configCopy); + } + setConfig(POLLS, polls); + initUniqueTag(builder.uniqueTag, polls.values()); + } + + @Override + protected void preStart() { + SetMultimap<ShellPollIdentifier, ShellPollConfig<?>> polls = getConfig(POLLS); + + for (final ShellPollIdentifier pollInfo : polls.keySet()) { + Set<ShellPollConfig<?>> configs = polls.get(pollInfo); + long minPeriod = Integer.MAX_VALUE; + Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet(); + + for (ShellPollConfig<?> config : configs) { + handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this)); + if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); + } + + final ProcessTaskFactory<?> taskFactory = newTaskFactory(pollInfo.command, pollInfo.env, pollInfo.dir, + pollInfo.input, pollInfo.context, pollInfo.timeout); + final ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext(); + + getPoller().scheduleAtFixedRate( + new Callable<SshPollValue>() { + @Override public SshPollValue call() throws Exception { + ProcessTaskWrapper<?> taskWrapper = taskFactory.newTask(); + executionContext.submit(taskWrapper); + taskWrapper.block(); + Optional<Integer> exitCode = Optional.fromNullable(taskWrapper.getExitCode()); + return new SshPollValue(null, exitCode.or(-1), taskWrapper.getStdout(), taskWrapper.getStderr()); + }}, + new DelegatingPollHandler<SshPollValue>(handlers), + minPeriod); + } + } + + @SuppressWarnings("unchecked") + protected Poller<SshPollValue> getPoller() { + return (Poller<SshPollValue>) super.getPoller(); + } + + /** + * Executes the given command (using `bash -l -c $command`, so as to have a good path set). + * + * @param command The command to execute + * @param env Environment variable settings, in format name=value + * @param dir Working directory, or null to inherit from current process + * @param input Input to send to the command (if not null) + */ + protected ProcessTaskFactory<?> newTaskFactory(final String command, Map<String,String> env, File dir, String input, final String summary, final long timeout) { + // FIXME Add generic timeout() support to task ExecutionManager + if (timeout > 0) { + log.warn("Timeout ({}ms) not currently supported for ShellFeed {}", timeout, this); + } + + return new ConcreteSystemProcessTaskFactory<Object>(command) + .environmentVariables(env) + .loginShell(true) + .directory(dir) + .runAsCommand() + .summary(summary); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java new file mode 100644 index 0000000..e1147c3 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellPollConfig.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.shell; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.feed.PollConfig; +import org.apache.brooklyn.feed.ssh.SshPollValue; +import org.apache.brooklyn.util.collections.MutableList; + +import com.google.common.base.Predicate; +import com.google.common.collect.Maps; + +public class ShellPollConfig<T> extends PollConfig<SshPollValue, T, ShellPollConfig<T>> { + + private String command; + private Map<String,String> env = Maps.newLinkedHashMap(); + private long timeout = -1; + private File dir; + private String input; + + public static final Predicate<SshPollValue> DEFAULT_SUCCESS = new Predicate<SshPollValue>() { + @Override + public boolean apply(@Nullable SshPollValue input) { + return input != null && input.getExitStatus() == 0; + }}; + + public ShellPollConfig(AttributeSensor<T> sensor) { + super(sensor); + super.checkSuccess(DEFAULT_SUCCESS); + } + + public ShellPollConfig(ShellPollConfig<T> other) { + super(other); + command = other.command; + env = other.env; + timeout = other.timeout; + dir = other.dir; + input = other.input; + } + + public String getCommand() { + return command; + } + + public Map<String, String> getEnv() { + return env; + } + + public File getDir() { + return dir; + } + + public String getInput() { + return input; + } + + public long getTimeout() { + return timeout; + } + + public ShellPollConfig<T> command(String val) { + this.command = val; + return this; + } + + public ShellPollConfig<T> env(String key, String val) { + env.put(checkNotNull(key, "key"), checkNotNull(val, "val")); + return this; + } + + public ShellPollConfig<T> env(Map<String,String> val) { + for (Map.Entry<String, String> entry : checkNotNull(val, "map").entrySet()) { + env(entry.getKey(), entry.getValue()); + } + return this; + } + + public ShellPollConfig<T> dir(File val) { + this.dir = val; + return this; + } + + public ShellPollConfig<T> input(String val) { + this.input = val; + return this; + } + + public ShellPollConfig<T> timeout(long timeout) { + return timeout(timeout, TimeUnit.MILLISECONDS); + } + + public ShellPollConfig<T> timeout(long timeout, TimeUnit units) { + this.timeout = units.toMillis(timeout); + return this; + } + + @Override protected String toStringBaseName() { return "shell"; } + @Override protected String toStringPollSource() { return command; } + @Override protected MutableList<Object> equalsFields() { return super.equalsFields().appendIfNotNull(command); } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java new file mode 100644 index 0000000..8663137 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshFeed.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.ssh; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.feed.AbstractFeed; +import org.apache.brooklyn.core.feed.AttributePollHandler; +import org.apache.brooklyn.core.feed.DelegatingPollHandler; +import org.apache.brooklyn.core.feed.Poller; +import org.apache.brooklyn.core.location.Locations; +import org.apache.brooklyn.core.location.Machines; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.internal.ssh.SshTool; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.base.Objects; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; + +/** + * Provides a feed of attribute values, by polling over ssh. + * + * Example usage (e.g. in an entity that extends SoftwareProcessImpl): + * <pre> + * {@code + * private SshFeed feed; + * + * //@Override + * protected void connectSensors() { + * super.connectSensors(); + * + * feed = SshFeed.builder() + * .entity(this) + * .machine(mySshMachineLachine) + * .poll(new SshPollConfig<Boolean>(SERVICE_UP) + * .command("rabbitmqctl -q status") + * .onSuccess(new Function<SshPollValue, Boolean>() { + * public Boolean apply(SshPollValue input) { + * return (input.getExitStatus() == 0); + * }})) + * .build(); + * } + * + * {@literal @}Override + * protected void disconnectSensors() { + * super.disconnectSensors(); + * if (feed != null) feed.stop(); + * } + * } + * </pre> + * + * @author aled + */ +public class SshFeed extends AbstractFeed { + + public static final Logger log = LoggerFactory.getLogger(SshFeed.class); + + @SuppressWarnings("serial") + public static final ConfigKey<Supplier<SshMachineLocation>> MACHINE = ConfigKeys.newConfigKey( + new TypeToken<Supplier<SshMachineLocation>>() {}, + "machine"); + + public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand"); + + @SuppressWarnings("serial") + public static final ConfigKey<SetMultimap<SshPollIdentifier, SshPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( + new TypeToken<SetMultimap<SshPollIdentifier, SshPollConfig<?>>>() {}, + "polls"); + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private EntityLocal entity; + private boolean onlyIfServiceUp = false; + private Supplier<SshMachineLocation> machine; + private Duration period = Duration.of(500, TimeUnit.MILLISECONDS); + private List<SshPollConfig<?>> polls = Lists.newArrayList(); + private boolean execAsCommand = false; + private String uniqueTag; + private volatile boolean built; + + public Builder entity(EntityLocal val) { + this.entity = val; + return this; + } + public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } + public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { + this.onlyIfServiceUp = onlyIfServiceUp; + return this; + } + /** optional, to force a machine; otherwise it is inferred from the entity */ + public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); } + /** optional, to force a machine; otherwise it is inferred from the entity */ + public Builder machine(Supplier<SshMachineLocation> val) { + this.machine = val; + return this; + } + public Builder period(Duration period) { + this.period = period; + return this; + } + public Builder period(long millis) { + return period(Duration.of(millis, TimeUnit.MILLISECONDS)); + } + public Builder period(long val, TimeUnit units) { + return period(Duration.of(val, units)); + } + public Builder poll(SshPollConfig<?> config) { + polls.add(config); + return this; + } + public Builder execAsCommand() { + execAsCommand = true; + return this; + } + public Builder execAsScript() { + execAsCommand = false; + return this; + } + public Builder uniqueTag(String uniqueTag) { + this.uniqueTag = uniqueTag; + return this; + } + public SshFeed build() { + built = true; + SshFeed result = new SshFeed(this); + result.setEntity(checkNotNull(entity, "entity")); + result.start(); + return result; + } + @Override + protected void finalize() { + if (!built) log.warn("SshFeed.Builder created, but build() never called"); + } + } + + private static class SshPollIdentifier { + final Supplier<String> command; + final Supplier<Map<String, String>> env; + + private SshPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) { + this.command = checkNotNull(command, "command"); + this.env = checkNotNull(env, "env"); + } + + @Override + public int hashCode() { + return Objects.hashCode(command, env); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SshPollIdentifier)) { + return false; + } + SshPollIdentifier o = (SshPollIdentifier) other; + return Objects.equal(command, o.command) && + Objects.equal(env, o.env); + } + } + + /** @deprecated since 0.7.0, use static convenience on {@link Locations} */ + @Deprecated + public static SshMachineLocation getMachineOfEntity(Entity entity) { + return Machines.findUniqueSshMachineLocation(entity.getLocations()).orNull(); + } + + /** + * For rebind; do not call directly; use builder + */ + public SshFeed() { + } + + protected SshFeed(final Builder builder) { + setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp); + setConfig(MACHINE, builder.machine != null ? builder.machine : null); + setConfig(EXEC_AS_COMMAND, builder.execAsCommand); + + SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create(); + for (SshPollConfig<?> config : builder.polls) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + SshPollConfig<?> configCopy = new SshPollConfig(config); + if (configCopy.getPeriod() < 0) configCopy.period(builder.period); + polls.put(new SshPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy); + } + setConfig(POLLS, polls); + initUniqueTag(builder.uniqueTag, polls.values()); + } + + protected SshMachineLocation getMachine() { + Supplier<SshMachineLocation> supplier = getConfig(MACHINE); + if (supplier != null) { + return supplier.get(); + } else { + return Locations.findUniqueSshMachineLocation(entity.getLocations()).get(); + } + } + + @Override + protected void preStart() { + SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = getConfig(POLLS); + + for (final SshPollIdentifier pollInfo : polls.keySet()) { + Set<SshPollConfig<?>> configs = polls.get(pollInfo); + long minPeriod = Integer.MAX_VALUE; + Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet(); + + for (SshPollConfig<?> config : configs) { + handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this)); + if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); + } + + getPoller().scheduleAtFixedRate( + new Callable<SshPollValue>() { + public SshPollValue call() throws Exception { + return exec(pollInfo.command.get(), pollInfo.env.get()); + }}, + new DelegatingPollHandler<SshPollValue>(handlers), + minPeriod); + } + } + + @SuppressWarnings("unchecked") + protected Poller<SshPollValue> getPoller() { + return (Poller<SshPollValue>) super.getPoller(); + } + + private SshPollValue exec(String command, Map<String,String> env) throws IOException { + SshMachineLocation machine = getMachine(); + Boolean execAsCommand = getConfig(EXEC_AS_COMMAND); + if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env}); + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + + int exitStatus; + ConfigBag flags = ConfigBag.newInstance() + .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true) + .configure(SshTool.PROP_OUT_STREAM, stdout) + .configure(SshTool.PROP_ERR_STREAM, stderr); + if (Boolean.TRUE.equals(execAsCommand)) { + exitStatus = machine.execCommands(flags.getAllConfig(), + "ssh-feed", ImmutableList.of(command), env); + } else { + exitStatus = machine.execScript(flags.getAllConfig(), + "ssh-feed", ImmutableList.of(command), env); + } + + return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java new file mode 100644 index 0000000..8fec87f --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollConfig.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.ssh; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.feed.PollConfig; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; + +public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<T>> { + + private Supplier<String> commandSupplier; + private List<Supplier<Map<String,String>>> dynamicEnvironmentSupplier = MutableList.of(); + + public static final Predicate<SshPollValue> DEFAULT_SUCCESS = new Predicate<SshPollValue>() { + @Override + public boolean apply(@Nullable SshPollValue input) { + return input != null && input.getExitStatus() == 0; + }}; + + public SshPollConfig(AttributeSensor<T> sensor) { + super(sensor); + super.checkSuccess(DEFAULT_SUCCESS); + } + + public SshPollConfig(SshPollConfig<T> other) { + super(other); + commandSupplier = other.commandSupplier; + } + + /** @deprecated since 0.7.0; use {@link #getCommandSupplier()} and resolve just-in-time */ + public String getCommand() { + return getCommandSupplier().get(); + } + public Supplier<String> getCommandSupplier() { + return commandSupplier; + } + + /** @deprecated since 0.7.0; use {@link #getEnvSupplier()} and resolve just-in-time */ + public Map<String, String> getEnv() { + return getEnvSupplier().get(); + } + public Supplier<Map<String,String>> getEnvSupplier() { + return new Supplier<Map<String,String>>() { + @Override + public Map<String, String> get() { + Map<String,String> result = MutableMap.of(); + for (Supplier<Map<String, String>> envS: dynamicEnvironmentSupplier) { + if (envS!=null) { + Map<String, String> envM = envS.get(); + if (envM!=null) { + mergeEnvMaps(envM, result); + } + } + } + return result; + } + }; + } + + protected void mergeEnvMaps(Map<String,String> supplied, Map<String,String> target) { + if (supplied==null) return; + // as the value is a string there is no need to look at deep merge behaviour + target.putAll(supplied); + } + + public SshPollConfig<T> command(String val) { return command(Suppliers.ofInstance(val)); } + public SshPollConfig<T> command(Supplier<String> val) { + this.commandSupplier = val; + return this; + } + + /** add the given env param; sequence is as per {@link #env(Supplier)} */ + public SshPollConfig<T> env(String key, String val) { + return env(Collections.singletonMap(key, val)); + } + + /** add the given env params; sequence is as per {@link #env(Supplier)}. + * behaviour is undefined if the map supplied here is subsequently changed. + * <p> + * if a map's contents might change, use {@link #env(Supplier)} */ + public SshPollConfig<T> env(Map<String,String> val) { + if (val==null) return this; + return env(Suppliers.ofInstance(val)); + } + + /** + * adds the given dynamic supplier of environment variables. + * <p> + * use of a supplier allows env vars to be computed on each execution, + * for example to take the most recent sensor values. + * <p> + * in the case of multiple map suppliers, static maps, or static {@link #env(String, String)} + * key value pairs, the order in which they are specified here is the order + * in which they are computed and applied. + **/ + public SshPollConfig<T> env(Supplier<Map<String,String>> val) { + Preconditions.checkNotNull(val); + dynamicEnvironmentSupplier.add(val); + return this; + } + + @Override protected String toStringBaseName() { return "ssh"; } + @Override protected Object toStringPollSource() { + if (getCommandSupplier()==null) return null; + String command = getCommandSupplier().get(); + return command; + } + @Override protected MutableList<Object> equalsFields() { + return super.equalsFields() + .appendIfNotNull(getCommandSupplier()!=null ? getCommandSupplier().get() : null) + .appendIfNotNull(getEnvSupplier()!=null ? getEnvSupplier().get() : null); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java new file mode 100644 index 0000000..af0a8a6 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshPollValue.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.ssh; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.location.ssh.SshMachineLocation; + +public class SshPollValue { + + private final SshMachineLocation machine; + private final int exitStatus; + private final String stdout; + private final String stderr; + + public SshPollValue(SshMachineLocation machine, int exitStatus, String stdout, String stderr) { + this.machine = machine; + this.exitStatus = exitStatus; + this.stdout = stdout; + this.stderr = stderr; + } + + /** The machine the command will run on. */ + public SshMachineLocation getMachine() { + return machine; + } + + /** Command exit status, or -1 if error is set. */ + public int getExitStatus() { + return exitStatus; + } + + /** Command standard output; may be null if no content available. */ + @Nullable + public String getStdout() { + return stdout; + } + + /** Command standard error; may be null if no content available. */ + @Nullable + public String getStderr() { + return stderr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java new file mode 100644 index 0000000..370c3ce --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/feed/ssh/SshValueFunctions.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.feed.ssh; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Predicates; + +public class SshValueFunctions { + + public static Function<SshPollValue, Integer> exitStatus() { + return new Function<SshPollValue, Integer>() { + @Override public Integer apply(SshPollValue input) { + return input.getExitStatus(); + } + }; + } + + public static Function<SshPollValue, String> stdout() { + return new Function<SshPollValue, String>() { + @Override public String apply(SshPollValue input) { + return input.getStdout(); + } + }; + } + + public static Function<SshPollValue, String> stderr() { + return new Function<SshPollValue, String>() { + @Override public String apply(SshPollValue input) { + return input.getStderr(); + } + }; + } + + public static Function<SshPollValue, Boolean> exitStatusEquals(final int expected) { + return chain(SshValueFunctions.exitStatus(), Functions.forPredicate(Predicates.equalTo(expected))); + } + + // TODO Do we want these chain methods? Does guava have them already? Duplicated in HttpValueFunctions. + public static <A,B,C> Function<A,C> chain(final Function<A,? extends B> f1, final Function<B,C> f2) { + return new Function<A,C>() { + @Override public C apply(@Nullable A input) { + return f2.apply(f1.apply(input)); + } + }; + } + + public static <A,B,C,D> Function<A,D> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,D> f3) { + return new Function<A,D>() { + @Override public D apply(@Nullable A input) { + return f3.apply(f2.apply(f1.apply(input))); + } + }; + } +}
