add an "only if service up" mode to several sensor feeds (the ones most likely to be configured ahead of time), and just-in-time inferene for ssh-feeds with test
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/4e3415f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/4e3415f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/4e3415f9 Branch: refs/heads/master Commit: 4e3415f9014b44b002b433031bfb732720d9fdfb Parents: eb7da4f Author: Alex Heneveld <alex.henev...@cloudsoftcorp.com> Authored: Mon Jun 2 19:50:53 2014 +0100 Committer: Alex Heneveld <alex.henev...@cloudsoftcorp.com> Committed: Tue Jun 24 21:59:14 2014 +0100 ---------------------------------------------------------------------- .../java/brooklyn/event/feed/AbstractFeed.java | 7 +- .../java/brooklyn/event/feed/FeedConfig.java | 9 +++ .../main/java/brooklyn/event/feed/Poller.java | 16 ++++- .../event/feed/function/FunctionFeed.java | 8 ++- .../java/brooklyn/event/feed/http/HttpFeed.java | 8 ++- .../java/brooklyn/event/feed/ssh/SshFeed.java | 50 +++++++++----- .../java/brooklyn/event/feed/PollerTest.java | 2 +- .../event/feed/ssh/SshFeedIntegrationTest.java | 68 +++++++++++++++++++- .../brooklyn/entity/chef/ChefAttributeFeed.java | 8 ++- 9 files changed, 150 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/core/src/main/java/brooklyn/event/feed/AbstractFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/event/feed/AbstractFeed.java b/core/src/main/java/brooklyn/event/feed/AbstractFeed.java index 498409f..066f65f 100644 --- a/core/src/main/java/brooklyn/event/feed/AbstractFeed.java +++ b/core/src/main/java/brooklyn/event/feed/AbstractFeed.java @@ -26,13 +26,16 @@ public abstract class AbstractFeed { protected final EntityLocal entity; protected final Poller<?> poller; - private volatile Long lastConnectionTime; private volatile boolean activated, suspended; private final Object pollerStateMutex = new Object(); public AbstractFeed(EntityLocal entity) { + this(entity, false); + } + + public AbstractFeed(EntityLocal entity, boolean onlyIfServiceUp) { this.entity = checkNotNull(entity, "entity"); - this.poller = new Poller<Object>(entity); + this.poller = new Poller<Object>(entity, onlyIfServiceUp); } /** true if everything has been _started_ (or it is starting) but not stopped, http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/core/src/main/java/brooklyn/event/feed/FeedConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/event/feed/FeedConfig.java b/core/src/main/java/brooklyn/event/feed/FeedConfig.java index 34178e9..fce0ac9 100644 --- a/core/src/main/java/brooklyn/event/feed/FeedConfig.java +++ b/core/src/main/java/brooklyn/event/feed/FeedConfig.java @@ -82,6 +82,15 @@ public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> { this.checkSuccess = checkNotNull(val, "checkSuccess"); return self(); } + /** as {@link #checkSuccess(Predicate)} */ + public F checkSuccess(final Function<? super V,Boolean> val) { + return checkSuccess(new Predicate<V>() { + @Override + public boolean apply(V input) { + return val.apply(input); + } + }); + } public F onSuccess(Function<? super V,T> val) { this.onsuccess = checkNotNull(val, "onSuccess"); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/core/src/main/java/brooklyn/event/feed/Poller.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/event/feed/Poller.java b/core/src/main/java/brooklyn/event/feed/Poller.java index 9065c1c..2ee112c 100644 --- a/core/src/main/java/brooklyn/event/feed/Poller.java +++ b/core/src/main/java/brooklyn/event/feed/Poller.java @@ -7,6 +7,7 @@ import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.BrooklynTaskTags; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityInternal; @@ -32,6 +33,7 @@ public class Poller<V> { public static final Logger log = LoggerFactory.getLogger(Poller.class); private final EntityLocal entity; + private final boolean onlyIfServiceUp; private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>(); private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>(); private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>(); @@ -74,8 +76,14 @@ public class Poller<V> { } } + /** @deprecated since 0.7.0, pass in whether should run onlyIfServiceUp */ + @Deprecated public Poller(EntityLocal entity) { + this(entity, false); + } + public Poller(EntityLocal entity, boolean onlyIfServiceUp) { this.entity = entity; + this.onlyIfServiceUp = onlyIfServiceUp; } /** Submits a one-off poll job; recommended that callers supply to-String so that task has a decent description */ @@ -121,7 +129,13 @@ public class Poller<V> { Callable<Task<?>> pollingTaskFactory = new Callable<Task<?>>() { public Task<?> call() { DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), - new Callable<Void>() { public Void call() { pollJob.wrappedJob.run(); return null; } } ); + new Callable<Void>() { public Void call() { + if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { + return null; + } + pollJob.wrappedJob.run(); + return null; + } } ); BrooklynTaskTags.setTransient(task); return task; } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java b/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java index 5f9f3b5..c40a7ba 100644 --- a/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java +++ b/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java @@ -68,6 +68,7 @@ public class FunctionFeed extends AbstractFeed { public static class Builder { private EntityLocal entity; + private boolean onlyIfServiceUp = false; private long period = 500; private TimeUnit periodUnits = TimeUnit.MILLISECONDS; private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList(); @@ -77,6 +78,11 @@ public class FunctionFeed extends AbstractFeed { this.entity = val; return this; } + public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } + public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { + this.onlyIfServiceUp = onlyIfServiceUp; + return this; + } public Builder period(Duration d) { return period(d.toMilliseconds(), TimeUnit.MILLISECONDS); } @@ -126,7 +132,7 @@ public class FunctionFeed extends AbstractFeed { private final SetMultimap<FunctionPollIdentifier, FunctionPollConfig<?,?>> polls = HashMultimap.<FunctionPollIdentifier,FunctionPollConfig<?,?>>create(); protected FunctionFeed(Builder builder) { - super(builder.entity); + super(builder.entity, builder.onlyIfServiceUp); for (FunctionPollConfig<?,?> config : builder.polls) { FunctionPollConfig<?,?> configCopy = new FunctionPollConfig(config); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java b/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java index 42a69d8..a26afb5 100644 --- a/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java +++ b/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java @@ -92,6 +92,7 @@ public class HttpFeed extends AbstractFeed { public static class Builder { private EntityLocal entity; + private boolean onlyIfServiceUp = false; private Supplier<URI> baseUriProvider; private Duration period = Duration.millis(500); private List<HttpPollConfig<?>> polls = Lists.newArrayList(); @@ -106,6 +107,11 @@ public class HttpFeed extends AbstractFeed { this.entity = val; return this; } + public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } + public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { + this.onlyIfServiceUp = onlyIfServiceUp; + return this; + } public Builder baseUri(Supplier<URI> val) { if (baseUri!=null && val!=null) throw new IllegalStateException("Builder cannot take both a URI and a URI Provider"); @@ -233,7 +239,7 @@ public class HttpFeed extends AbstractFeed { private final SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = HashMultimap.<HttpPollIdentifier,HttpPollConfig<?>>create(); protected HttpFeed(Builder builder) { - super(builder.entity); + super(builder.entity, builder.onlyIfServiceUp); Map<String,String> baseHeaders = ImmutableMap.copyOf(checkNotNull(builder.headers, "headers")); for (HttpPollConfig<?> config : builder.polls) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java b/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java index 74e6d16..55816a6 100644 --- a/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java +++ b/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java @@ -19,17 +19,17 @@ import brooklyn.event.feed.AbstractFeed; import brooklyn.event.feed.AttributePollHandler; import brooklyn.event.feed.DelegatingPollHandler; import brooklyn.event.feed.Poller; -import brooklyn.location.Location; +import brooklyn.location.basic.Locations; import brooklyn.location.basic.Machines; import brooklyn.location.basic.SshMachineLocation; import brooklyn.util.collections.MutableMap; -import brooklyn.util.guava.Maybe; import brooklyn.util.time.Duration; import com.google.common.base.Objects; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; @@ -78,7 +78,8 @@ public class SshFeed extends AbstractFeed { public static class Builder { private EntityLocal entity; - private SshMachineLocation machine; + private boolean onlyIfServiceUp = false; + private Supplier<SshMachineLocation> machine; private long period = 500; private TimeUnit periodUnits = TimeUnit.MILLISECONDS; private List<SshPollConfig<?>> polls = Lists.newArrayList(); @@ -89,7 +90,15 @@ public class SshFeed extends AbstractFeed { this.entity = val; return this; } - public Builder machine(SshMachineLocation val) { + public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } + public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { + this.onlyIfServiceUp = onlyIfServiceUp; + return this; + } + /** optional, to force a machine; otherwise it is inferred from the entity */ + public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); } + /** optional, to force a machine; otherwise it is inferred from the entity */ + public Builder machine(Supplier<SshMachineLocation> val) { this.machine = val; return this; } @@ -153,24 +162,31 @@ public class SshFeed extends AbstractFeed { } } - private final SshMachineLocation machine; + private final Supplier<SshMachineLocation> machine; private final boolean execAsCommand; // Treat as immutable once built private final SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create(); + /** @deprecated since 0.7.0, use static convenience on {@link Locations} */ + @Deprecated public static SshMachineLocation getMachineOfEntity(Entity entity) { - Maybe<SshMachineLocation> maybe = Machines.findUniqueSshMachineLocation(entity.getLocations()); - if (maybe.isAbsentOrNull()) return null; - return maybe.get(); + return Machines.findUniqueSshMachineLocation(entity.getLocations()).orNull(); } - protected SshFeed(Builder builder) { - super(builder.entity); - machine = checkNotNull(builder.machine != null ? builder.machine : getMachineOfEntity(builder.entity), "machine"); + protected SshFeed(final Builder builder) { + super(builder.entity, builder.onlyIfServiceUp); + machine = builder.machine != null ? builder.machine : + new Supplier<SshMachineLocation>() { + @Override + public SshMachineLocation get() { + return Locations.findUniqueSshMachineLocation(entity.getLocations()).get(); + } + }; execAsCommand = builder.execAsCommand; for (SshPollConfig<?> config : builder.polls) { + @SuppressWarnings({ "unchecked", "rawtypes" }) SshPollConfig<?> configCopy = new SshPollConfig(config); if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); String command = config.getCommand(); @@ -184,7 +200,7 @@ public class SshFeed extends AbstractFeed { for (final SshPollIdentifier pollInfo : polls.keySet()) { Set<SshPollConfig<?>> configs = polls.get(pollInfo); long minPeriod = Integer.MAX_VALUE; - Set<AttributePollHandler<SshPollValue>> handlers = Sets.newLinkedHashSet(); + Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet(); for (SshPollConfig<?> config : configs) { handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this)); @@ -196,7 +212,7 @@ public class SshFeed extends AbstractFeed { public SshPollValue call() throws Exception { return exec(pollInfo.command, pollInfo.env); }}, - new DelegatingPollHandler(handlers), + new DelegatingPollHandler<SshPollValue>(handlers), minPeriod); } } @@ -213,13 +229,13 @@ public class SshFeed extends AbstractFeed { int exitStatus; if (execAsCommand) { - exitStatus = machine.execCommands(MutableMap.<String,Object>of("out", stdout, "err", stderr), + exitStatus = machine.get().execCommands(MutableMap.<String,Object>of("out", stdout, "err", stderr), "ssh-feed", ImmutableList.of(command), env); } else { - exitStatus = machine.execScript(MutableMap.<String,Object>of("out", stdout, "err", stderr), + exitStatus = machine.get().execScript(MutableMap.<String,Object>of("out", stdout, "err", stderr), "ssh-feed", ImmutableList.of(command), env); } - return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray())); + return new SshPollValue(machine.get(), exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray())); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/core/src/test/java/brooklyn/event/feed/PollerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/event/feed/PollerTest.java b/core/src/test/java/brooklyn/event/feed/PollerTest.java index cfea7e9..0396869 100644 --- a/core/src/test/java/brooklyn/event/feed/PollerTest.java +++ b/core/src/test/java/brooklyn/event/feed/PollerTest.java @@ -34,7 +34,7 @@ public class PollerTest { public void setUp() throws Exception { app = ApplicationBuilder.newManagedApp(TestApplication.class); entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); - poller = new Poller<Integer>(entity); + poller = new Poller<Integer>(entity, false); } @AfterMethod(alwaysRun=true) http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/core/src/test/java/brooklyn/event/feed/ssh/SshFeedIntegrationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/event/feed/ssh/SshFeedIntegrationTest.java b/core/src/test/java/brooklyn/event/feed/ssh/SshFeedIntegrationTest.java index d1eaeeb..535bd5a 100644 --- a/core/src/test/java/brooklyn/event/feed/ssh/SshFeedIntegrationTest.java +++ b/core/src/test/java/brooklyn/event/feed/ssh/SshFeedIntegrationTest.java @@ -2,13 +2,16 @@ package brooklyn.event.feed.ssh; import static org.testng.Assert.assertTrue; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import brooklyn.entity.basic.ApplicationBuilder; +import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityLocal; +import brooklyn.entity.proxying.EntityInitializer; import brooklyn.entity.proxying.EntitySpec; import brooklyn.event.AttributeSensor; import brooklyn.event.basic.Sensors; @@ -16,9 +19,12 @@ import brooklyn.location.basic.LocalhostMachineProvisioningLocation; import brooklyn.location.basic.SshMachineLocation; import brooklyn.test.Asserts; import brooklyn.test.EntityTestUtils; +import brooklyn.test.entity.LocalManagementContextForTests; import brooklyn.test.entity.TestApplication; import brooklyn.test.entity.TestEntity; import brooklyn.util.stream.Streams; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; import com.google.common.base.Function; import com.google.common.base.Predicates; @@ -37,9 +43,9 @@ public class SshFeedIntegrationTest { @BeforeMethod(alwaysRun=true) public void setUp() throws Exception { - loc = new LocalhostMachineProvisioningLocation(); + app = ApplicationBuilder.newManagedApp(TestApplication.class, new LocalManagementContextForTests()); + loc = app.newLocalhostProvisioningLocation(); machine = loc.obtain(); - app = ApplicationBuilder.newManagedApp(TestApplication.class); entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); app.start(ImmutableList.of(loc)); } @@ -51,6 +57,32 @@ public class SshFeedIntegrationTest { if (loc != null) Streams.closeQuietly(loc); } + /** this is one of the most common pattern */ + @Test(groups="Integration") + public void testReturnsSshStdoutAndInfersMachine() throws Exception { + final TestEntity entity2 = app.createAndManageChild(EntitySpec.create(TestEntity.class) + // inject the machine location, because the app was started with a provisioning location + // and TestEntity doesn't provision + .location(machine)); + + feed = SshFeed.builder() + .entity(entity2) + .poll(new SshPollConfig<String>(SENSOR_STRING) + .command("echo hello") + .onSuccess(SshValueFunctions.stdout())) + .build(); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + String val = entity2.getAttribute(SENSOR_STRING); + assertTrue(val != null); + }}); + + String val = entity2.getAttribute(SENSOR_STRING); + Assert.assertTrue(val.contains("hello"), "val="+val); + Assert.assertEquals(val.trim(), "hello"); + } + @Test(groups="Integration") public void testReturnsSshExitStatus() throws Exception { feed = SshFeed.builder() @@ -121,4 +153,36 @@ public class SshFeedIntegrationTest { assertTrue(val != null && val.contains("Exit status 123"), "val=" + val); }}); } + + @Test(groups="Integration") + public void testAddedEarly() throws Exception { + final TestEntity entity2 = app.addChild(EntitySpec.create(TestEntity.class) + .location(machine) + .addInitializer(new EntityInitializer() { + @Override + public void apply(EntityLocal entity) { + SshFeed.builder() + .entity(entity) + .onlyIfServiceUp() + .poll(new SshPollConfig<String>(SENSOR_STRING) + .command("echo hello") + .onSuccess(SshValueFunctions.stdout())) + .build(); + } + })); + Time.sleep(Duration.seconds(2)); + // would be nice to hook in and assert no errors + Assert.assertEquals(entity2.getAttribute(SENSOR_STRING), null); + Entities.manage(entity2); + Time.sleep(Duration.seconds(2)); + Assert.assertEquals(entity2.getAttribute(SENSOR_STRING), null); + entity2.setAttribute(Attributes.SERVICE_UP, true); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + String val = entity2.getAttribute(SENSOR_STRING); + assertTrue(val != null && val.contains("hello"), "val="+val); + }}); + } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/4e3415f9/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java b/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java index be6c7e3..47670c7 100644 --- a/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java +++ b/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java @@ -83,6 +83,7 @@ public class ChefAttributeFeed extends AbstractFeed { @SuppressWarnings("rawtypes") public static class Builder { private EntityLocal entity; + private boolean onlyIfServiceUp = false; private String nodeName; private Map<String, AttributeSensor<?>> sensors = Maps.newHashMap(); private long period = 30; @@ -93,6 +94,11 @@ public class ChefAttributeFeed extends AbstractFeed { this.entity = checkNotNull(val, "entity"); return this; } + public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } + public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { + this.onlyIfServiceUp = onlyIfServiceUp; + return this; + } public Builder nodeName(String nodeName) { this.nodeName = checkNotNull(nodeName, "nodeName"); return this; @@ -147,7 +153,7 @@ public class ChefAttributeFeed extends AbstractFeed { private final KnifeTaskFactory<String> knifeTaskFactory; protected ChefAttributeFeed(Builder builder) { - super(checkNotNull(builder.entity, "builder.entity")); + super(checkNotNull(builder.entity, "builder.entity"), builder.onlyIfServiceUp); entity = builder.entity; nodeName = checkNotNull(builder.nodeName, "builder.nodeName"); period = builder.period;