Moved sensor feed for machine metrics to an EntityInitializer - Refactor MachineEntity to use new initializer - Configure enrichers for memory deltas - Update FeedSupport in AbstractEntity - Fix typos in AutoScalerPolicy
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/cc6c2a95 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/cc6c2a95 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/cc6c2a95 Branch: refs/heads/master Commit: cc6c2a952f35b9bc6cd317e021f8369344fce759 Parents: 7e54dfd Author: Andrew Donald Kennedy <[email protected]> Authored: Thu Jun 16 08:12:45 2016 +0100 Committer: Andrew Donald Kennedy <[email protected]> Committed: Wed Jun 22 12:56:36 2016 -0700 ---------------------------------------------------------------------- .../brooklyn/core/entity/AbstractEntity.java | 50 ++++-- .../brooklyn/core/entity/EntityInternal.java | 19 ++- .../policy/autoscaling/AutoScalerPolicy.java | 4 +- .../entity/machine/AddMachineMetrics.java | 155 +++++++++++++++++++ .../entity/machine/MachineAttributes.java | 1 + .../brooklyn/entity/machine/MachineEntity.java | 8 - .../entity/machine/MachineEntityImpl.java | 97 ++---------- .../software/base/SoftwareProcessImpl.java | 66 ++++---- .../machine/MachineEntityEc2LiveTest.java | 12 +- 9 files changed, 255 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java index e38343e..9c6cfcf 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java @@ -221,7 +221,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E private Collection<AbstractPolicy> policiesInternal = Lists.newCopyOnWriteArrayList(); private Collection<AbstractEnricher> enrichersInternal = Lists.newCopyOnWriteArrayList(); - Collection<Feed> feeds = Lists.newCopyOnWriteArrayList(); + private Collection<Feed> feedsInternal = Lists.newCopyOnWriteArrayList(); // FIXME we do not currently support changing parents, but to implement a cluster that can shrink we need to support at least // orphaning (i.e. removing ownership). This flag notes if the entity has previously had a parent, and if an attempt is made to @@ -249,6 +249,8 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E private final BasicGroupSupport groups = new BasicGroupSupport(); + private final BasicFeedSupport feeds = new BasicFeedSupport(); + /** * The config values of this entity. Updating this map should be done * via getConfig/setConfig. @@ -1664,9 +1666,8 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E * {@link BasicPolicySupport}. */ @Beta - // TODO revert to private when config() is reverted to return SensorSupportInternal public class BasicPolicySupport implements PolicySupportInternal { - + @Override public Iterator<Policy> iterator() { return asList().iterator(); @@ -1692,11 +1693,11 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E LOG.debug("Removing "+old+" when adding "+policy+" to "+AbstractEntity.this); remove(old); } - + CatalogUtils.setCatalogItemIdOnAddition(AbstractEntity.this, policy); policiesInternal.add((AbstractPolicy)policy); ((AbstractPolicy)policy).setEntity(AbstractEntity.this); - + getManagementSupport().getEntityChangeListener().onPolicyAdded(policy); sensors().emit(AbstractEntity.POLICY_ADDED, new PolicyDescriptor(policy)); } @@ -1707,19 +1708,19 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E add(policy); return policy; } - + @Override public boolean remove(Policy policy) { ((AbstractPolicy)policy).destroy(); boolean changed = policiesInternal.remove(policy); - + if (changed) { getManagementSupport().getEntityChangeListener().onPolicyRemoved(policy); sensors().emit(AbstractEntity.POLICY_REMOVED, new PolicyDescriptor(policy)); } return changed; } - + @Override public boolean removeAllPolicies() { boolean changed = false; @@ -1737,7 +1738,6 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E * {@link BasicEnricherSupport}. */ @Beta - // TODO revert to private when config() is reverted to return SensorSupportInternal public class BasicEnricherSupport implements EnricherSupportInternal { @Override public Iterator<Enricher> iterator() { @@ -1976,12 +1976,12 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E */ @Override public <T extends Feed> T addFeed(T feed) { - return feeds().addFeed(feed); + return feeds().add(feed); } @Override public FeedSupport feeds() { - return new BasicFeedSupport(); + return feeds; } @Override @@ -1989,16 +1989,22 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E public FeedSupport getFeedSupport() { return feeds(); } - + protected class BasicFeedSupport implements FeedSupport { + @Override public Collection<Feed> getFeeds() { - return ImmutableList.<Feed>copyOf(feeds); + return ImmutableList.<Feed>copyOf(feedsInternal); + } + + @Override + public <T extends Feed> T add(T feed) { + return addFeed(feed); } @Override public <T extends Feed> T addFeed(T feed) { - Feed old = findApparentlyEqualAndWarnIfNotSameUniqueTag(feeds, feed); + Feed old = findApparentlyEqualAndWarnIfNotSameUniqueTag(feedsInternal, feed); if (old != null) { if (old == feed) { if (!BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_FEED_REGISTRATION_PROPERTY)) { @@ -2013,7 +2019,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E } CatalogUtils.setCatalogItemIdOnAddition(AbstractEntity.this, feed); - feeds.add(feed); + feedsInternal.add(feed); if (!AbstractEntity.this.equals(((AbstractFeed)feed).getEntity())) ((AbstractFeed)feed).setEntity(AbstractEntity.this); @@ -2025,9 +2031,14 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E } @Override + public boolean remove(Feed feed) { + return removeFeed(feed); + } + + @Override public boolean removeFeed(Feed feed) { feed.stop(); - boolean changed = feeds.remove(feed); + boolean changed = feedsInternal.remove(feed); if (changed) { getManagementContext().getRebindManager().getChangeListener().onUnmanaged(feed); @@ -2037,9 +2048,14 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E } @Override + public boolean removeAll() { + return removeAllFeeds(); + } + + @Override public boolean removeAllFeeds() { boolean changed = false; - for (Feed feed : feeds) { + for (Feed feed : feedsInternal) { changed = removeFeed(feed) || changed; } return changed; http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java b/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java index 51ac9eb..933390f 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/EntityInternal.java @@ -222,24 +222,37 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal, Reb } public interface FeedSupport { + Collection<Feed> getFeeds(); - + /** * Adds the given feed to this entity. The feed will automatically be re-added on brooklyn restart. */ + <T extends Feed> T add(T feed); + + /** @deprecated since 0.10.0; use {@link #add()} */ + @Deprecated <T extends Feed> T addFeed(T feed); - + /** * Removes the given feed from this entity. * @return True if the feed existed at this entity; false otherwise */ + boolean remove(Feed feed); + + /** @deprecated since 0.10.0; use {@link #remove()} */ + @Deprecated boolean removeFeed(Feed feed); - + /** * Removes all feeds from this entity. * Use with caution as some entities automatically register feeds; this will remove those feeds as well. * @return True if any feeds existed at this entity; false otherwise */ + boolean removeAll(); + + /** @deprecated since 0.10.0; use {@link #removeAll()} */ + @Deprecated boolean removeAllFeeds(); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java index b484359..87c67e9 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java @@ -151,10 +151,10 @@ public class AutoScalerPolicy extends AbstractPolicy { this.resizeUpIterationMax = val; return this; } public Builder resizeDownIterationIncrement(Integer val) { - this.resizeUpIterationIncrement = val; return this; + this.resizeDownIterationIncrement = val; return this; } public Builder resizeDownIterationMax(Integer val) { - this.resizeUpIterationMax = val; return this; + this.resizeDownIterationMax = val; return this; } public Builder minPeriodBetweenExecs(Duration val) { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java new file mode 100644 index 0000000..e118b93 --- /dev/null +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.CharMatcher; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Splitter; +import com.google.common.collect.FluentIterable; +import com.google.common.primitives.Doubles; + +import org.apache.brooklyn.api.entity.EntityInitializer; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.EnricherSpec; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.enricher.stock.YamlRollingTimeWindowMeanEnricher; +import org.apache.brooklyn.enricher.stock.YamlTimeWeightedDeltaEnricher; +import org.apache.brooklyn.entity.software.base.SoftwareProcess; +import org.apache.brooklyn.feed.ssh.SshFeed; +import org.apache.brooklyn.feed.ssh.SshPollConfig; +import org.apache.brooklyn.feed.ssh.SshPollValue; +import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; + +/** + * Adds a {@link SSHFeed feed} with sensors returning details about the machine the entity is running on. + * <p> + * The machine must be SSHable and running Linux. + * + * @since 0.10.0 + */ +@Beta +public class AddMachineMetrics implements EntityInitializer { + + private static final Logger LOG = LoggerFactory.getLogger(AddMachineMetrics.class); + + static { + MachineAttributes.init(); + } + + @Override + public void apply(EntityLocal entity) { + SshFeed machineMetricsFeed = createMachineMetricsFeed(entity); + ((EntityInternal) entity).feeds().add(machineMetricsFeed); + addMachineMetricsEnrichers(entity); + LOG.info("Configured machine metrics feed and enrichers on {}", entity); + } + + public static void addMachineMetricsEnrichers(EntityLocal entity) { + entity.enrichers().add(EnricherSpec.create(YamlTimeWeightedDeltaEnricher.class) + .configure(YamlTimeWeightedDeltaEnricher.SOURCE_SENSOR, MachineAttributes.USED_MEMORY) + .configure(YamlTimeWeightedDeltaEnricher.TARGET_SENSOR, MachineAttributes.USED_MEMORY_DELTA_PER_SECOND_LAST)); + entity.enrichers().add(EnricherSpec.create(YamlRollingTimeWindowMeanEnricher.class) + .configure(YamlRollingTimeWindowMeanEnricher.SOURCE_SENSOR, MachineAttributes.USED_MEMORY_DELTA_PER_SECOND_LAST) + .configure(YamlRollingTimeWindowMeanEnricher.TARGET_SENSOR, MachineAttributes.USED_MEMORY_DELTA_PER_SECOND_IN_WINDOW)); + } + + public static SshFeed createMachineMetricsFeed(EntityLocal entity) { + boolean retrieveUsageMetrics = entity.config().get(SoftwareProcess.RETRIEVE_USAGE_METRICS); + return SshFeed.builder() + .period(Duration.THIRTY_SECONDS) + .entity(entity) + .poll(SshPollConfig.forSensor(MachineAttributes.UPTIME) + .command("cat /proc/uptime") + .enabled(retrieveUsageMetrics) + .onFailureOrException(Functions.<Duration>constant(null)) + .onSuccess(new Function<SshPollValue, Duration>() { + @Override + public Duration apply(SshPollValue input) { + return Duration.seconds(Double.valueOf(Strings.getFirstWord(input.getStdout()))); + } + })) + .poll(SshPollConfig.forSensor(MachineAttributes.LOAD_AVERAGE) + .command("uptime") + .enabled(retrieveUsageMetrics) + .onFailureOrException(Functions.<Double>constant(null)) + .onSuccess(new Function<SshPollValue, Double>() { + @Override + public Double apply(SshPollValue input) { + String loadAverage = Strings.getFirstWordAfter(input.getStdout(), "load average:").replace(",", ""); + return Double.valueOf(loadAverage); + } + })) + .poll(SshPollConfig.forSensor(MachineAttributes.CPU_USAGE) + .command("ps -A -o pcpu") + .enabled(retrieveUsageMetrics) + .onFailureOrException(Functions.<Double>constant(null)) + .onSuccess(new Function<SshPollValue, Double>() { + @Override + public Double apply(SshPollValue input) { + Double cpu = 0d; + Iterable<String> stdout = Splitter.on(CharMatcher.BREAKING_WHITESPACE).omitEmptyStrings().split(input.getStdout()); + for (Double each : FluentIterable.from(stdout).skip(1).transform(Doubles.stringConverter())) { cpu += each; } + return cpu / 100d; + } + })) + .poll(SshPollConfig.forSensor(MachineAttributes.USED_MEMORY) + .command("free | grep Mem:") + .enabled(retrieveUsageMetrics) + .onFailureOrException(Functions.<Long>constant(null)) + .onSuccess(new Function<SshPollValue, Long>() { + @Override + public Long apply(SshPollValue input) { + List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); + return Long.parseLong(memoryData.get(2)); + } + })) + .poll(SshPollConfig.forSensor(MachineAttributes.FREE_MEMORY) + .command("free | grep Mem:") + .enabled(retrieveUsageMetrics) + .onFailureOrException(Functions.<Long>constant(null)) + .onSuccess(new Function<SshPollValue, Long>() { + @Override + public Long apply(SshPollValue input) { + List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); + return Long.parseLong(memoryData.get(3)); + } + })) + .poll(SshPollConfig.forSensor(MachineAttributes.TOTAL_MEMORY) + .command("free | grep Mem:") + .enabled(retrieveUsageMetrics) + .onFailureOrException(Functions.<Long>constant(null)) + .onSuccess(new Function<SshPollValue, Long>() { + @Override + public Long apply(SshPollValue input) { + List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); + return Long.parseLong(memoryData.get(1)); + } + })) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java index 1a7ac9b..ff3ef86 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineAttributes.java @@ -52,6 +52,7 @@ public class MachineAttributes { public static final AttributeSensor<Long> FREE_MEMORY = Sensors.newLongSensor("machine.memory.free", "Current free memory"); public static final AttributeSensor<Long> TOTAL_MEMORY = Sensors.newLongSensor("machine.memory.total", "Total memory"); public static final AttributeSensor<Long> USED_MEMORY = Sensors.newLongSensor("machine.memory.used", "Current memory usage"); + public static final AttributeSensor<Double> USED_MEMORY_DELTA_PER_SECOND_LAST = Sensors.newDoubleSensor("memory.used.delta", "Change in memory usage per second"); public static final AttributeSensor<Double> USED_MEMORY_DELTA_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("memory.used.windowed", "Average change in memory usage over 30s"); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java index d3a1254..6a169c4 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntity.java @@ -20,7 +20,6 @@ package org.apache.brooklyn.entity.machine; import org.apache.brooklyn.api.catalog.Catalog; import org.apache.brooklyn.api.entity.ImplementedBy; -import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.core.annotation.Effector; import org.apache.brooklyn.core.annotation.EffectorParam; import org.apache.brooklyn.core.effector.MethodEffector; @@ -31,13 +30,6 @@ import org.apache.brooklyn.util.time.Duration; @ImplementedBy(MachineEntityImpl.class) public interface MachineEntity extends EmptySoftwareProcess { - AttributeSensor<Duration> UPTIME = MachineAttributes.UPTIME; - AttributeSensor<Double> LOAD_AVERAGE = MachineAttributes.LOAD_AVERAGE; - AttributeSensor<Double> CPU_USAGE = MachineAttributes.CPU_USAGE; - AttributeSensor<Long> FREE_MEMORY = MachineAttributes.FREE_MEMORY; - AttributeSensor<Long> TOTAL_MEMORY = MachineAttributes.TOTAL_MEMORY; - AttributeSensor<Long> USED_MEMORY = MachineAttributes.USED_MEMORY; - MethodEffector<String> EXEC_COMMAND = new MethodEffector<String>(MachineEntity.class, "execCommand"); MethodEffector<String> EXEC_COMMAND_TIMEOUT = new MethodEffector<String>(MachineEntity.class, "execCommandTimeout"); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java index 87bbc72..dda1c71 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineEntityImpl.java @@ -18,39 +18,29 @@ */ package org.apache.brooklyn.entity.machine; -import java.util.List; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.brooklyn.core.effector.ssh.SshEffectorTasks; import org.apache.brooklyn.core.location.Machines; import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver; import org.apache.brooklyn.entity.software.base.EmptySoftwareProcessDriver; import org.apache.brooklyn.entity.software.base.EmptySoftwareProcessImpl; import org.apache.brooklyn.feed.ssh.SshFeed; -import org.apache.brooklyn.feed.ssh.SshPollConfig; -import org.apache.brooklyn.feed.ssh.SshPollValue; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.time.Duration; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Splitter; - public class MachineEntityImpl extends EmptySoftwareProcessImpl implements MachineEntity { private static final Logger LOG = LoggerFactory.getLogger(MachineEntityImpl.class); - static { - MachineAttributes.init(); - } - - private transient SshFeed sensorFeed; + private transient SshFeed machineMetricsFeed; @Override public void init() { @@ -61,82 +51,17 @@ public class MachineEntityImpl extends EmptySoftwareProcessImpl implements Machi @Override protected void connectSensors() { super.connectSensors(); - - // Sensors linux-specific - if (!getMachine().getMachineDetails().getOsDetails().isLinux()) return; - - sensorFeed = SshFeed.builder() - .entity(this) - .period(Duration.THIRTY_SECONDS) - .poll(new SshPollConfig<Duration>(UPTIME) - .command("cat /proc/uptime") - .onFailureOrException(Functions.<Duration>constant(null)) - .onSuccess(new Function<SshPollValue, Duration>() { - @Override - public Duration apply(SshPollValue input) { - return Duration.seconds( Double.valueOf( Strings.getFirstWord(input.getStdout()) ) ); - } - })) - .poll(new SshPollConfig<Double>(LOAD_AVERAGE) - .command("uptime") - .onFailureOrException(Functions.constant(-1d)) - .onSuccess(new Function<SshPollValue, Double>() { - @Override - public Double apply(SshPollValue input) { - String loadAverage = Strings.getFirstWordAfter(input.getStdout(), "load average:").replace(",", ""); - return Double.valueOf(loadAverage); - } - })) - .poll(new SshPollConfig<Double>(CPU_USAGE) - .command("cat /proc/stat") - .onFailureOrException(Functions.constant(-1d)) - .onSuccess(new Function<SshPollValue, Double>() { - @Override - public Double apply(SshPollValue input) { - List<String> cpuData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); - Integer system = Integer.parseInt(cpuData.get(1)); - Integer user = Integer.parseInt(cpuData.get(3)); - Integer idle = Integer.parseInt(cpuData.get(4)); - return (double) (system + user) / (double) (system + user + idle); - } - })) - .poll(new SshPollConfig<Long>(USED_MEMORY) - .command("free | grep Mem:") - .onFailureOrException(Functions.constant(-1L)) - .onSuccess(new Function<SshPollValue, Long>() { - @Override - public Long apply(SshPollValue input) { - List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); - return Long.parseLong(memoryData.get(2)); - } - })) - .poll(new SshPollConfig<Long>(FREE_MEMORY) - .command("free | grep Mem:") - .onFailureOrException(Functions.constant(-1L)) - .onSuccess(new Function<SshPollValue, Long>() { - @Override - public Long apply(SshPollValue input) { - List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); - return Long.parseLong(memoryData.get(3)); - } - })) - .poll(new SshPollConfig<Long>(TOTAL_MEMORY) - .command("free | grep Mem:") - .onFailureOrException(Functions.constant(-1L)) - .onSuccess(new Function<SshPollValue, Long>() { - @Override - public Long apply(SshPollValue input) { - List<String> memoryData = Splitter.on(" ").omitEmptyStrings().splitToList(Strings.getFirstLine(input.getStdout())); - return Long.parseLong(memoryData.get(1)); - } - })) - .build(); - + Maybe<SshMachineLocation> location = Machines.findUniqueMachineLocation(getLocations(), SshMachineLocation.class); + if (location.isPresent() && location.get().getOsDetails().isLinux()) { + machineMetricsFeed = AddMachineMetrics.createMachineMetricsFeed(this); + AddMachineMetrics.addMachineMetricsEnrichers(this); + } else { + LOG.warn("Not adding machine metrics feed as no suitable location available on entity"); + } } - @Override public void disconnectSensors() { - if (sensorFeed != null) sensorFeed.stop(); + if (machineMetricsFeed != null) machineMetricsFeed.stop(); super.disconnectSensors(); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java index c4acf01..7397e0d 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java @@ -27,7 +27,14 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; @@ -36,12 +43,10 @@ import org.apache.brooklyn.api.entity.drivers.EntityDriverManager; import org.apache.brooklyn.api.location.Location; import org.apache.brooklyn.api.location.MachineLocation; import org.apache.brooklyn.api.location.MachineProvisioningLocation; -import org.apache.brooklyn.api.location.PortRange; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.EnricherSpec; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.enricher.AbstractEnricher; import org.apache.brooklyn.core.entity.AbstractEntity; import org.apache.brooklyn.core.entity.Attributes; @@ -53,13 +58,13 @@ import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic; import org.apache.brooklyn.core.location.LocationConfigKeys; import org.apache.brooklyn.core.location.cloud.CloudLocationConfig; +import org.apache.brooklyn.entity.machine.MachineAttributes; import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.feed.function.FunctionPollConfig; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -67,15 +72,6 @@ import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.time.CountdownTimer; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Functions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.google.common.reflect.TypeToken; /** * An {@link Entity} representing a piece of software which can be installed, run, and controlled. @@ -85,15 +81,16 @@ import com.google.common.reflect.TypeToken; * It exposes sensors for service state (Lifecycle) and status (String), and for host info, log file location. */ public abstract class SoftwareProcessImpl extends AbstractEntity implements SoftwareProcess, DriverDependentEntity { - private static final Logger log = LoggerFactory.getLogger(SoftwareProcessImpl.class); - + + private static final Logger LOG = LoggerFactory.getLogger(SoftwareProcessImpl.class); + private transient SoftwareProcessDriver driver; /** @see #connectServiceUpIsRunning() */ - private volatile FunctionFeed serviceProcessIsRunning; + private transient FunctionFeed serviceProcessIsRunning; protected boolean connectedSensors = false; - + public SoftwareProcessImpl() { super(MutableMap.of(), null); } @@ -340,7 +337,7 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft // whereas on start the *driver* calls connectSensors, before calling postStart, // ie waiting for the entity truly to be started before calling postStart; // TODO feels like that confusion could be eliminated with a single place for pre/post logic!) - log.debug("disconnecting sensors for "+this+" in entity.preStop"); + LOG.debug("disconnecting sensors for "+this+" in entity.preStop"); disconnectSensors(); // Must set the serviceProcessIsRunning explicitly to false - we've disconnected the sensors @@ -387,18 +384,18 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft connectSensors(); } else { long delay = (long) (Math.random() * configuredMaxDelay.toMilliseconds()); - log.debug("Scheduled reconnection of sensors on {} in {}ms", this, delay); + LOG.debug("Scheduled reconnection of sensors on {} in {}ms", this, delay); Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { try { if (getManagementSupport().isNoLongerManaged()) { - log.debug("Entity {} no longer managed; ignoring scheduled connect sensors on rebind", SoftwareProcessImpl.this); + LOG.debug("Entity {} no longer managed; ignoring scheduled connect sensors on rebind", SoftwareProcessImpl.this); return; } connectSensors(); } catch (Throwable e) { - log.warn("Problem connecting sensors on rebind of "+SoftwareProcessImpl.this, e); + LOG.warn("Problem connecting sensors on rebind of "+SoftwareProcessImpl.this, e); Exceptions.propagateIfFatal(e); } } @@ -439,28 +436,27 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft //Only if the expected state is ON_FIRE then the entity has permanently failed. Transition expectedState = getAttribute(SERVICE_STATE_EXPECTED); if (expectedState == null || expectedState.getState() != Lifecycle.RUNNING) { - log.warn("On rebind of {}, not calling software process rebind hooks because expected state is {}", this, expectedState); + LOG.warn("On rebind of {}, not calling software process rebind hooks because expected state is {}", this, expectedState); return; } Lifecycle actualState = getAttribute(SERVICE_STATE_ACTUAL); if (actualState == null || actualState != Lifecycle.RUNNING) { - log.warn("Rebinding entity {}, even though actual state is {}. Expected state is {}", new Object[] {this, actualState, expectedState}); + LOG.warn("Rebinding entity {}, even though actual state is {}. Expected state is {}", new Object[] { this, actualState, expectedState }); } // e.g. rebinding to a running instance // FIXME For rebind, what to do about things in STARTING or STOPPING state? // FIXME What if location not set? - log.info("Rebind {} connecting to pre-running service", this); + LOG.info("Rebind {} connecting to pre-running service", this); MachineLocation machine = getMachineOrNull(); if (machine != null) { initDriver(machine); driver.rebind(); - if (log.isDebugEnabled()) log.debug("On rebind of {}, re-created driver {}", this, driver); + LOG.debug("On rebind of {}, re-created driver {}", this, driver); } else { - log.info("On rebind of {}, no MachineLocation found (with locations {}) so not generating driver", - this, getLocations()); + LOG.info("On rebind of {}, no MachineLocation found (with locations {}) so not generating driver", this, getLocations()); } callRebindHooks(); @@ -491,7 +487,7 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft if (raw2.isPresentAndNonNull()) { Object pp = raw2.get(); if (!(pp instanceof Map)) { - log.debug("When obtaining provisioning properties for "+this+" to deploy to "+location+", detected that coercion was needed, so coercing sooner than we would otherwise"); + LOG.debug("When obtaining provisioning properties for "+this+" to deploy to "+location+", detected that coercion was needed, so coercing sooner than we would otherwise"); pp = config().get(PROVISIONING_PROPERTIES); } result.putAll((Map<?,?>)pp); @@ -548,7 +544,7 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft if ((driver instanceof AbstractSoftwareProcessDriver) && machine.equals(((AbstractSoftwareProcessDriver)driver).getLocation())) { return driver; //just reuse } else { - log.warn("driver/location change is untested for {} at {}; changing driver and continuing", this, machine); + LOG.warn("driver/location change is untested for {} at {}; changing driver and continuing", this, machine); return newDriver(machine); } } else { @@ -558,7 +554,7 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft // TODO Find a better way to detect early death of process. public void waitForEntityStart() { - if (log.isDebugEnabled()) log.debug("waiting to ensure {} doesn't abort prematurely", this); + LOG.debug("waiting to ensure {} doesn't abort prematurely", this); Duration startTimeout = getConfig(START_TIMEOUT); CountdownTimer timer = startTimeout.countdownTimer(); boolean isRunningResult = false; @@ -568,7 +564,7 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft Time.sleep(delay); try { isRunningResult = driver.isRunning(); - if (log.isDebugEnabled()) log.debug("checked {}, 'is running' returned: {}", this, isRunningResult); + LOG.debug("checked {}, 'is running' returned: {}", this, isRunningResult); } catch (Exception e) { Exceptions.propagateIfFatal(e); @@ -576,13 +572,13 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft if (driver != null) { String msg = "checked " + this + ", 'is running' threw an exception; logging subsequent exceptions at debug level"; if (firstFailure == null) { - log.error(msg, e); + LOG.error(msg, e); } else { - log.debug(msg, e); + LOG.debug(msg, e); } } else { // provide extra context info, as we're seeing this happen in strange circumstances - log.error(this+" concurrent start and shutdown detected", e); + LOG.error(this+" concurrent start and shutdown detected", e); } if (firstFailure == null) { firstFailure = e; @@ -598,7 +594,7 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft if (firstFailure != null) { msg += "; check failed at least once with exception: " + firstFailure.getMessage() + ", see logs for details"; } - log.warn(msg+" (throwing)"); + LOG.warn(msg+" (throwing)"); ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING); throw new IllegalStateException(msg, firstFailure); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cc6c2a95/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityEc2LiveTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityEc2LiveTest.java index 9c4e571..2ecdf9a 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityEc2LiveTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityEc2LiveTest.java @@ -40,12 +40,12 @@ public class MachineEntityEc2LiveTest extends AbstractEc2LiveTest { Asserts.succeedsEventually(new Runnable() { @Override public void run() { - assertNotNull(server.getAttribute(MachineEntity.UPTIME)); - assertNotNull(server.getAttribute(MachineEntity.LOAD_AVERAGE)); - assertNotNull(server.getAttribute(MachineEntity.CPU_USAGE)); - assertNotNull(server.getAttribute(MachineEntity.FREE_MEMORY)); - assertNotNull(server.getAttribute(MachineEntity.TOTAL_MEMORY)); - assertNotNull(server.getAttribute(MachineEntity.USED_MEMORY)); + assertNotNull(server.getAttribute(MachineAttributes.UPTIME)); + assertNotNull(server.getAttribute(MachineAttributes.LOAD_AVERAGE)); + assertNotNull(server.getAttribute(MachineAttributes.CPU_USAGE)); + assertNotNull(server.getAttribute(MachineAttributes.FREE_MEMORY)); + assertNotNull(server.getAttribute(MachineAttributes.TOTAL_MEMORY)); + assertNotNull(server.getAttribute(MachineAttributes.USED_MEMORY)); }}); String result = server.execCommand("MY_ENV=myval && echo start $MY_ENV");
