This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 83b41d0c82328e93d71d187676427d8a8dc691f0 Author: Alex Heneveld <[email protected]> AuthorDate: Sun Aug 21 17:40:01 2022 +0100 start feeds once entity is fully managed, not before otherwise isManagedActive test is false if not fully managed. also subscribe to service up if feed is enabled only when service is up, and misc tidies. but might be better to leave this as was, and instead use isManagedActiveOrComingUp (and keep the other changes, just revert to starting feeds directly on feed creation. though this change seems better!) --- .../apache/brooklyn/api/objs/EntityAdjunct.java | 6 ++ .../java/org/apache/brooklyn/api/sensor/Feed.java | 2 +- .../camp/brooklyn/SshCommandSensorYamlTest.java | 85 +++++++++++++++++++++- .../apache/brooklyn/core/feed/AbstractFeed.java | 19 ++++- .../org/apache/brooklyn/core/feed/PollConfig.java | 2 +- .../java/org/apache/brooklyn/core/feed/Poller.java | 20 +++-- .../mgmt/internal/EntityManagementSupport.java | 7 +- .../core/mgmt/rebind/BasicEntityRebindSupport.java | 8 -- .../core/sensor/AbstractAddTriggerableSensor.java | 2 + .../core/sensor/http/HttpRequestSensor.java | 2 + .../brooklyn/core/sensor/ssh/SshCommandSensor.java | 2 +- .../apache/brooklyn/feed/AbstractCommandFeed.java | 6 +- .../brooklyn/feed/function/FunctionFeed.java | 16 +--- .../org/apache/brooklyn/feed/http/HttpFeed.java | 10 ++- .../org/apache/brooklyn/feed/shell/ShellFeed.java | 5 +- .../policy/enricher/HttpLatencyDetector.java | 3 +- .../brooklynnode/BrooklynEntityMirrorImpl.java | 3 +- .../java/org/apache/brooklyn/feed/jmx/JmxFeed.java | 6 +- .../brooklyn/tasks/kubectl/ContainerSensor.java | 3 +- .../core/sensor/windows/WinRmCommandSensor.java | 3 +- .../windows/WindowsPerformanceCounterFeed.java | 6 +- .../org/apache/brooklyn/util/time/Duration.java | 4 +- 22 files changed, 161 insertions(+), 59 deletions(-) diff --git a/api/src/main/java/org/apache/brooklyn/api/objs/EntityAdjunct.java b/api/src/main/java/org/apache/brooklyn/api/objs/EntityAdjunct.java index af55fecd63..9c7bac58c2 100644 --- a/api/src/main/java/org/apache/brooklyn/api/objs/EntityAdjunct.java +++ b/api/src/main/java/org/apache/brooklyn/api/objs/EntityAdjunct.java @@ -55,4 +55,10 @@ public interface EntityAdjunct extends BrooklynObject { @Nullable String getUniqueTag(); Map<String, HighlightTuple> getHighlights(); + + interface AutoStartEntityAdjunct extends EntityAdjunct { + /** for things that should start when the entity is managed, including on rebind; + * replaces logic which started things during creation time */ + public void start(); + } } diff --git a/api/src/main/java/org/apache/brooklyn/api/sensor/Feed.java b/api/src/main/java/org/apache/brooklyn/api/sensor/Feed.java index 01a47148fb..a7364f1973 100644 --- a/api/src/main/java/org/apache/brooklyn/api/sensor/Feed.java +++ b/api/src/main/java/org/apache/brooklyn/api/sensor/Feed.java @@ -39,7 +39,7 @@ import com.google.common.annotations.Beta; * </ul> */ @Beta -public interface Feed extends EntityAdjunct, Rebindable { +public interface Feed extends EntityAdjunct, EntityAdjunct.AutoStartEntityAdjunct, Rebindable { /** * True if everything has been _started_ (or it is starting) but not stopped, diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java index e71713e775..644909b555 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlTest.java @@ -21,6 +21,7 @@ package org.apache.brooklyn.camp.brooklyn; import java.util.List; import java.util.Map; +import com.google.common.base.Stopwatch; import org.apache.brooklyn.api.entity.Application; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; @@ -31,10 +32,12 @@ import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.entity.TestApplication; import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess; import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool; import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.CustomResponse; import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.ExecParams; import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.BeforeMethod; @@ -199,5 +202,85 @@ public class SshCommandSensorYamlTest extends AbstractYamlTest { protected Logger getLogger() { return log; } - + + @Test + public void testSshCommandSensorFeedRunsAtStartup() throws Exception { + RecordingSshTool.setCustomResponse(".*myCommand.*", new RecordingSshTool.CustomResponse(0, "myResponse", null)); + + Entity app = createAndStartApplication( + "location:", + " localhost:", + " sshToolClass: "+RecordingSshTool.class.getName(), + "services:", + "- type: " + VanillaSoftwareProcess.class.getName(), + " brooklyn.config:", + " onbox.base.dir.skipResolution: true", + " brooklyn.initializers:", + " - type: org.apache.brooklyn.core.sensor.ssh.SshCommandSensor", + " brooklyn.config:", + " name: mySensor", + " command: myCommand", + " period: 5s"); + waitForApplicationTasks(app); + + VanillaSoftwareProcess entity = (VanillaSoftwareProcess) Iterables.getOnlyElement(app.getChildren()); + Stopwatch sw = Stopwatch.createStarted(); + EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newStringSensor("mySensor"), "myResponse"); + Asserts.assertThat(Duration.of(sw), d -> d.isShorterThan(Duration.seconds(4))); + } + + + @Test(groups="Integration") // because slow + public void testSshCommandSensorPeriodicFeedServiceUpFalseDoesNotRunAtStartup() throws Exception { + RecordingSshTool.setCustomResponse(".*myCommand.*", new RecordingSshTool.CustomResponse(0, "myResponse", null)); + + Stopwatch sw = Stopwatch.createStarted(); + Entity app = createAndStartApplication( + "location:", + " localhost:", + " sshToolClass: "+RecordingSshTool.class.getName(), + "services:", + "- type: " + VanillaSoftwareProcess.class.getName(), + " brooklyn.config:", + " onbox.base.dir.skipResolution: true", + " brooklyn.initializers:", + " - type: org.apache.brooklyn.core.sensor.ssh.SshCommandSensor", + " brooklyn.config:", + " name: mySensor", + " command: myCommand", + " period: 5s", + " onlyIfServiceUp: true"); + waitForApplicationTasks(app); + + VanillaSoftwareProcess entity = (VanillaSoftwareProcess) Iterables.getOnlyElement(app.getChildren()); + EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newStringSensor("mySensor"), "myResponse"); + Asserts.assertThat(Duration.of(sw), d -> d.isLongerThan(Duration.seconds(4))); + } + + @Test + public void testSshCommandSensorTriggeredFeedDoesRunAtStartup() throws Exception { + RecordingSshTool.setCustomResponse(".*myCommand.*", new RecordingSshTool.CustomResponse(0, "myResponse", null)); + + Entity app = createAndStartApplication( + "location:", + " localhost:", + " sshToolClass: "+RecordingSshTool.class.getName(), + "services:", + "- type: " + VanillaSoftwareProcess.class.getName(), + " brooklyn.config:", + " onbox.base.dir.skipResolution: true", + " brooklyn.initializers:", + " - type: org.apache.brooklyn.core.sensor.ssh.SshCommandSensor", + " brooklyn.config:", + " name: mySensor", + " command: myCommand", + " triggers:", + " - triggerSensor"); + waitForApplicationTasks(app); + + VanillaSoftwareProcess entity = (VanillaSoftwareProcess) Iterables.getOnlyElement(app.getChildren()); + EntityAsserts.assertAttributeEqualsEventually(MutableMap.of("timeout", Duration.seconds(4)), entity, Sensors.newStringSensor("mySensor"), "myResponse"); + } + + } diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java index 7a50fff4db..43c4f7a69b 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java @@ -20,6 +20,8 @@ package org.apache.brooklyn.core.feed; import java.util.Collection; +import com.google.common.annotations.Beta; +import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.mgmt.rebind.RebindSupport; import org.apache.brooklyn.api.mgmt.rebind.mementos.FeedMemento; @@ -28,6 +30,7 @@ import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.BrooklynFeatureEnablement; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityAdjuncts; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.rebind.BasicFeedRebindSupport; @@ -38,6 +41,8 @@ import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.google.common.base.Preconditions.checkNotNull; + /** * Captures common fields and processes for sensor feeds. * These generally poll or subscribe to get sensor values for an entity. @@ -56,7 +61,17 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed public AbstractFeed() { } - + + @Beta + public static <T extends AbstractFeed> T initAndMaybeStart(T feed, Entity entity) { + feed.setEntity(checkNotNull((EntityInternal)entity, "entity")); + if (Entities.isManagedActive(entity)) { + // start it is entity is already managed (dynamic addition); otherwise rely on EntityManagementSupport to start us (initializer-based addition and after rebind) + feed.start(); + } + return feed; + } + // Ensure idempotent, as called in builders (in case not registered with entity), and also called // when registering with entity @Override @@ -96,7 +111,7 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed @Override public void start() { if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity); - if (activated) { + if (activated) { throw new IllegalStateException(String.format("Attempt to start feed %s of entity %s when already running", this, entity)); } diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java index 4533a73b7a..8e6893065e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java @@ -101,7 +101,7 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig< @Override protected MutableList<Object> toStringOtherFields() { MutableList<Object> result = super.toStringOtherFields().appendIfNotNull(description); - if (period>0 && period <= Duration.PRACTICALLY_FOREVER.toMilliseconds()) result.append("period: "+Duration.of(period)); + if (period>0 && period < Duration.PRACTICALLY_FOREVER.toMilliseconds()) result.append("period: "+Duration.of(period)); if (otherTriggers!=null) result.append("triggers: "+otherTriggers); return result; } diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java index 2b1a09488e..3b2c331a09 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -90,12 +90,19 @@ public class Poller<V> { DelegatingPollHandler handlerDelegate = new DelegatingPollHandler(handlers); boolean subscribed = false; for (PollConfig pc: configs) { + Set<Pair<Entity, Sensor>> triggersResolved = MutableSet.of(); if (pc.getOtherTriggers()!=null) { - List<Pair<Entity, Sensor>> triggersResolved = AbstractAddTriggerableSensor.resolveTriggers(feed.getEntity(), pc.getOtherTriggers()); - for (Pair<Entity, Sensor> pair : triggersResolved) {// TODO initial, condition - subscribe(pollJob, handlerDelegate, pair.getLeft(), pair.getRight(), pc.getCondition()); - subscribed = true; - } + triggersResolved.addAll(AbstractAddTriggerableSensor.resolveTriggers(feed.getEntity(), pc.getOtherTriggers())); + } + if (onlyIfServiceUp) { + // if 'onlyIfServiceUp' is set then automatically subscribe to that sensor. + // this is the default for ssh and other sensors which need a target machine. for others it defaults false. + triggersResolved.add(Pair.of(feed.getEntity(), Attributes.SERVICE_UP)); + } + + for (Pair<Entity, Sensor> pair : triggersResolved) { + subscribe(pollJob, handlerDelegate, pair.getLeft(), pair.getRight(), pc.getCondition()); + subscribed = true; } } if (minPeriodMillis>0 && (minPeriodMillis < Duration.PRACTICALLY_FOREVER.toMilliseconds() || !subscribed)) { @@ -124,7 +131,6 @@ public class Poller<V> { this.pollTriggerEntity = sensorSource; this.pollTriggerSensor = sensor; this.pollCondition = pollCondition; - wrappedJob = new Runnable() { @Override public void run() { @@ -200,7 +206,6 @@ public class Poller<V> { throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", this, entity)); } - started = true; for (final Callable<?> oneOffJob : oneOffJobs) { @@ -216,6 +221,7 @@ public class Poller<V> { Callable<Task<?>> tf = () -> { DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), + /** TODO why the hell is this running before the entity is managed??? and remove logging, and invocationCount=100. */ new Callable<Void>() { @Override public Void call() { if (!Entities.isManagedActive(entity)) { return null; diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java index 7e71a9cc57..a758f21000 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EntityManagementSupport.java @@ -210,7 +210,7 @@ public class EntityManagementSupport { } /* - * TODO framework starting events - phase 1, including rebind + * framework starting events - phase 1, including rebind * - establish hierarchy (child, groups, etc; construction if necessary on rebind) * - set location * - set local config values @@ -223,6 +223,11 @@ public class EntityManagementSupport { if (!isReadOnly()) { entity.onManagementStarting(); + + // start those policies etc which are labelled as auto-start + entity.policies().forEach(adj -> { if (adj instanceof EntityAdjunct.AutoStartEntityAdjunct) ((EntityAdjunct.AutoStartEntityAdjunct)adj).start(); }); + entity.enrichers().forEach(adj -> { if (adj instanceof EntityAdjunct.AutoStartEntityAdjunct) ((EntityAdjunct.AutoStartEntityAdjunct)adj).start(); }); + entity.feeds().forEach(f -> { if (!f.isActivated()) f.start(); }); } } catch (Throwable t) { managementFailed.set(true); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java index 2f2293d1e8..c4f7ba9a74 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicEntityRebindSupport.java @@ -159,14 +159,6 @@ public class BasicEntityRebindSupport extends AbstractBrooklynObjectRebindSuppor } catch (Exception e) { rebindContext.getExceptionHandler().onAddFeedFailed(entity, feed, e); } - - try { - if (!rebindContext.isReadOnly(feed)) { - feed.start(); - } - } catch (Exception e) { - rebindContext.getExceptionHandler().onRebindFailed(BrooklynObjectType.ENTITY, entity, e); - } } else { LOG.warn("Feed not found; discarding feed {} of entity {}({})", new Object[] {feedId, memento.getType(), memento.getId()}); diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java index 0d28f31b7f..e555e46ad4 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java @@ -53,6 +53,8 @@ public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorF "Sensors which should trigger this feed, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor"); public static final ConfigKey<DslPredicates.DslPredicate> CONDITION = ConfigKeys.newConfigKey(DslPredicates.DslPredicate.class, "condition", "Optional condition required for this sensor feed to run"); + public static final ConfigKey<Boolean> ONLY_IF_SERVICE_UP = ConfigKeys.newBooleanConfigKey("onlyIfServiceUp", "Whether to run only if service is up.", null); + protected AbstractAddTriggerableSensor() {} public AbstractAddTriggerableSensor(ConfigBag parameters) { super(parameters); diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java index e078eaffbe..0c6932badc 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java @@ -37,6 +37,7 @@ import org.apache.brooklyn.feed.http.HttpValueFunctions; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Functionals; +import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.http.HttpToolResponse; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; @@ -135,6 +136,7 @@ public class HttpRequestSensor<T> extends AbstractAddTriggerableSensor<T> { .baseUri(uri) .credentialsIfNotNull(username, password) .preemptiveBasicAuth(Boolean.TRUE.equals(preemptiveBasicAuth)) + .onlyIfServiceUp(Maybe.ofDisallowingNull(EntityInitializers.resolve(initParams(), ONLY_IF_SERVICE_UP)).or(false)) .poll(pollConfig); if (headers != null) { diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java index cdc77112ad..3632257361 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java @@ -165,7 +165,7 @@ public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> { SshFeed feed = SshFeed.builder() .entity(entity) - .onlyIfServiceUp() + .onlyIfServiceUp(Maybe.ofDisallowingNull(EntityInitializers.resolve(params, ONLY_IF_SERVICE_UP)).or(true)) .poll(pollConfig) .build(); diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java index c4fab5c647..5a9921d166 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java @@ -38,6 +38,7 @@ import org.apache.brooklyn.core.feed.DelegatingPollHandler; import org.apache.brooklyn.core.feed.Poller; import org.apache.brooklyn.core.location.Locations; import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; +import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.feed.ssh.SshPollValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,10 +158,7 @@ public abstract class AbstractCommandFeed extends AbstractFeed { public T build() { built = true; - T result = instantiateFeed(); - result.setEntity(checkNotNull((EntityLocal)entity, "entity")); - result.start(); - return result; + return AbstractFeed.initAndMaybeStart(instantiateFeed(), entity); } @Override diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java index 378bfe6b56..c9119d13c7 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java @@ -21,23 +21,13 @@ package org.apache.brooklyn.feed.function; import static com.google.common.base.Preconditions.checkNotNull; import java.util.List; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.feed.AbstractFeed; -import org.apache.brooklyn.core.feed.AttributePollHandler; -import org.apache.brooklyn.core.feed.DelegatingPollHandler; -import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; -import org.apache.brooklyn.util.core.javalang.BrooklynHttpConfig; -import org.apache.brooklyn.util.http.HttpToolResponse; -import org.apache.brooklyn.util.http.auth.UsernamePassword; -import org.apache.brooklyn.util.http.executor.HttpRequest; -import org.apache.brooklyn.util.http.executor.HttpResponse; import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +36,6 @@ import com.google.common.base.Objects; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; import com.google.common.reflect.TypeToken; /** @@ -146,10 +135,7 @@ public class FunctionFeed extends AbstractFeed { } public FunctionFeed build() { built = true; - FunctionFeed result = new FunctionFeed(this); - result.setEntity(checkNotNull((EntityInternal)entity, "entity")); - result.start(); - return result; + return AbstractFeed.initAndMaybeStart(new FunctionFeed(this), entity); } @Override protected void finalize() { diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java index 4a17f04b84..8af0dcfd86 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java @@ -38,6 +38,7 @@ import org.apache.brooklyn.core.location.Locations; import org.apache.brooklyn.core.location.Machines; import org.apache.brooklyn.core.location.internal.LocationInternal; import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; +import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.util.core.javalang.BrooklynHttpConfig; import org.apache.brooklyn.util.executor.HttpExecutorFactory; import org.apache.brooklyn.util.guava.Maybe; @@ -253,11 +254,18 @@ public class HttpFeed extends AbstractFeed { } } public HttpFeed build() { + return build(null); + } + /** normally no arg is required, but if feed is not attached to entity, it will need starting here */ + public HttpFeed build(Boolean feedStart) { built = true; HttpFeed result = new HttpFeed(this); result.setEntity(checkNotNull((EntityLocal)entity, "entity")); if (suspended) result.suspend(); - result.start(); + if ((feedStart==null && Entities.isManagedActive(entity)) || Boolean.TRUE.equals(feedStart)) { + // this feed is used a lot without being attached to an entity, not ideal, but let's support it + result.start(); + } return result; } @Override diff --git a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java index ab617f1f39..57dbba4121 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/shell/ShellFeed.java @@ -138,10 +138,7 @@ public class ShellFeed extends AbstractFeed { } public ShellFeed build() { built = true; - ShellFeed result = new ShellFeed(this); - result.setEntity(checkNotNull((EntityLocal)entity, "entity")); - result.start(); - return result; + return AbstractFeed.initAndMaybeStart(new ShellFeed(this), entity); } @Override protected void finalize() { diff --git a/policy/src/main/java/org/apache/brooklyn/policy/enricher/HttpLatencyDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/enricher/HttpLatencyDetector.java index 95d29b628e..d8a62740ba 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/enricher/HttpLatencyDetector.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/enricher/HttpLatencyDetector.java @@ -151,7 +151,7 @@ public class HttpLatencyDetector extends AbstractEnricher implements Enricher { .onResult(new ComputeLatencyAndRecordError()) .setOnException(null)) .suspended() - .build(); + .build(true); if (getUniqueTag()==null) uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+ @@ -195,6 +195,7 @@ public class HttpLatencyDetector extends AbstractEnricher implements Enricher { Boolean currentVal = entity.getAttribute(Startable.SERVICE_UP); if (currentVal != null) { AtomicReferences.setIfDifferent(serviceUp, currentVal); + updateEnablement(); } } diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java index 8b28607a05..7ff2c3a03c 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java @@ -139,7 +139,8 @@ public class BrooklynEntityMirrorImpl extends AbstractEntity implements Brooklyn return null; } })) - .poll(HttpPollConfig.forSensor(MIRROR_SUMMARY).onSuccess(new MirrorSummary())).build(); + .poll(HttpPollConfig.forSensor(MIRROR_SUMMARY).onSuccess(new MirrorSummary())) + .build(true); populateEffectors(); } diff --git a/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxFeed.java b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxFeed.java index 4220d05118..6789c9e374 100644 --- a/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxFeed.java +++ b/software/base/src/main/java/org/apache/brooklyn/feed/jmx/JmxFeed.java @@ -41,6 +41,7 @@ import org.apache.brooklyn.core.feed.DelegatingPollHandler; import org.apache.brooklyn.core.feed.PollHandler; import org.apache.brooklyn.core.feed.Poller; import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl; +import org.apache.brooklyn.feed.windows.WindowsPerformanceCounterFeed; import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,10 +168,7 @@ public class JmxFeed extends AbstractFeed { } public JmxFeed build() { built = true; - JmxFeed result = new JmxFeed(this); - result.setEntity(checkNotNull((EntityLocal)entity, "entity")); - result.start(); - return result; + return AbstractFeed.initAndMaybeStart(new JmxFeed(this), entity); } @Override protected void finalize() { diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java index dbcf071622..a757c239e8 100644 --- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java +++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java @@ -32,6 +32,7 @@ import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.feed.function.FunctionPollConfig; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +70,7 @@ public class ContainerSensor<T> extends AbstractAddTriggerableSensor<T> implemen ((EntityInternal) entity).feeds().add(FunctionFeed.builder() .entity(entity) - .onlyIfServiceUp() + .onlyIfServiceUp(Maybe.ofDisallowingNull(EntityInitializers.resolve(initParams(), ONLY_IF_SERVICE_UP)).or(false)) .poll(poll) .build()); } diff --git a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java index 08d9a2ec03..8e86435d96 100644 --- a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java +++ b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java @@ -42,6 +42,7 @@ import org.apache.brooklyn.util.core.internal.winrm.WinRmTool; import org.apache.brooklyn.util.core.json.ShellEnvironmentSerializer; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; @@ -136,7 +137,7 @@ public final class WinRmCommandSensor<T> extends AbstractAddTriggerableSensor<T> CmdFeed feed = CmdFeed.builder() .entity(entity) - .onlyIfServiceUp() + .onlyIfServiceUp(Maybe.ofDisallowingNull(EntityInitializers.resolve(initParams(), ONLY_IF_SERVICE_UP)).or(true)) .poll(pollConfig) .build(); diff --git a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java index 55b273ee74..4840d6ade0 100644 --- a/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java +++ b/software/winrm/src/main/java/org/apache/brooklyn/feed/windows/WindowsPerformanceCounterFeed.java @@ -46,6 +46,7 @@ import org.apache.brooklyn.core.feed.PollHandler; import org.apache.brooklyn.core.feed.Poller; import org.apache.brooklyn.core.location.Machines; import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.location.winrm.WinRmMachineLocation; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.internal.winrm.WinRmToolResponse; @@ -150,10 +151,7 @@ public class WindowsPerformanceCounterFeed extends AbstractFeed { } public WindowsPerformanceCounterFeed build() { built = true; - WindowsPerformanceCounterFeed result = new WindowsPerformanceCounterFeed(this); - result.setEntity(checkNotNull((EntityLocal)entity, "entity")); - result.start(); - return result; + return AbstractFeed.initAndMaybeStart(new WindowsPerformanceCounterFeed(this), entity); } @Override protected void finalize() { diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/time/Duration.java b/utils/common/src/main/java/org/apache/brooklyn/util/time/Duration.java index aaf562b072..e06315dfc0 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/time/Duration.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/time/Duration.java @@ -50,6 +50,8 @@ public class Duration implements Comparable<Duration>, Serializable { /** longest supported duration, 2^{63}-1 nanoseconds, approx ten billion seconds, or 300 years */ public static final Duration PRACTICALLY_FOREVER = of(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + /** used to indicate forever */ + private static final Duration ALMOST_PRACTICALLY_FOREVER = PRACTICALLY_FOREVER.subtract(Duration.days(365*50)); private final long nanos; @@ -71,7 +73,7 @@ public class Duration implements Comparable<Duration>, Serializable { @Override public String toString() { - if (equals(PRACTICALLY_FOREVER)) return "forever"; + if (isLongerThan(ALMOST_PRACTICALLY_FOREVER)) return "forever"; return Time.makeTimeStringExact(this); }
