This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 86e75d683ec25c505ce63ae270b02f5693d93e59 Author: Alex Heneveld <[email protected]> AuthorDate: Tue Aug 16 12:05:04 2022 +0100 allow common feeds to be triggered from sensors also improve names/display/unique-tag of some common sensors --- .../apache/brooklyn/core/feed/AbstractFeed.java | 7 +- .../brooklyn/core/feed/AttributePollHandler.java | 2 +- .../org/apache/brooklyn/core/feed/PollConfig.java | 19 +- .../java/org/apache/brooklyn/core/feed/Poller.java | 118 +++++++++--- .../core/sensor/AbstractAddSensorFeed.java | 17 ++ .../core/sensor/AbstractAddTriggerableSensor.java | 213 +++++++++++++++++++++ .../core/sensor/http/HttpRequestSensor.java | 11 +- .../brooklyn/core/sensor/ssh/SshCommandSensor.java | 16 +- .../brooklyn/entity/group/DynamicClusterImpl.java | 1 + .../entity/group/DynamicMultiGroupImpl.java | 1 + .../apache/brooklyn/feed/AbstractCommandFeed.java | 16 +- .../brooklyn/feed/function/FunctionFeed.java | 18 +- .../brooklyn/feed/function/FunctionPollConfig.java | 10 +- .../org/apache/brooklyn/feed/http/HttpFeed.java | 7 +- .../entity/software/base/SoftwareProcessImpl.java | 1 + .../brooklyn/tasks/kubectl/ContainerCommons.java | 4 + .../brooklyn/tasks/kubectl/ContainerSensor.java | 62 +++--- .../tasks/kubectl/ContainerTaskFactory.java | 9 +- .../tasks/kubectl/ContainerSensorTest.java | 33 ++++ .../core/sensor/windows/WinRmCommandSensor.java | 14 +- 20 files changed, 471 insertions(+), 108 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java index 70ef397ac1..7a50fff4db 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java @@ -201,7 +201,6 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed @Override protected void onChanged() { - // TODO Auto-generated method stub } /** @@ -209,7 +208,7 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed */ protected void preStart() { } - + /** * For overriding. */ @@ -233,6 +232,10 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed highlightTriggers("Running every "+minPeriod); } + public void highlightTriggers(String message) { + super.highlightTriggers(message); + } + void onRemoveSensor(Sensor<?> sensor) { highlightActionPublishSensor("Clear sensor "+sensor.getName()); } diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java index c59916984d..84e4d852ba 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java @@ -233,7 +233,7 @@ public class AttributePollHandler<V> implements PollHandler<V> { @Override public String getDescription() { - return sensor.getName()+" @ "+entity.getId()+" <- "+config; + return sensor.getName() + " " /*+entity.getId()*/ +" <- " + config; } protected String getBriefDescription() { diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java index d9990538e5..7c8144811d 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java @@ -34,6 +34,7 @@ import org.apache.brooklyn.util.time.Duration; public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig<V, T, F> { private long period = -1; + private Object otherTriggers; private String description; public PollConfig(AttributeSensor<T> sensor) { @@ -43,6 +44,8 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig< public PollConfig(PollConfig<V,T,F> other) { super(other); this.period = other.period; + this.otherTriggers = other.otherTriggers; + this.description = other.description; } public long getPeriod() { @@ -69,13 +72,25 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends FeedConfig< this.description = description; return self(); } - + + public F otherTriggers(Object otherTriggers) { + this.otherTriggers = otherTriggers; + return self(); + } + + public Object getOtherTriggers() { + return otherTriggers; + } + public String getDescription() { return description; } @Override protected MutableList<Object> toStringOtherFields() { - return super.toStringOtherFields().appendIfNotNull(description); + MutableList<Object> result = super.toStringOtherFields().appendIfNotNull(description); + if (period>0 && period <= Duration.PRACTICALLY_FOREVER.toMilliseconds()) result.append("period: "+Duration.of(period)); + if (otherTriggers!=null) result.append("triggers: "+otherTriggers); + return result; } @Override diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java index 6049ecf0e5..997c34fcf3 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -23,14 +23,20 @@ import java.util.Set; import java.util.concurrent.Callable; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.SubscriptionHandle; import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.task.DynamicSequentialTask; import org.apache.brooklyn.util.core.task.ScheduledTask; import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,11 +67,20 @@ public class Poller<V> { final PollHandler<? super V> handler; final Duration pollPeriod; final Runnable wrappedJob; + final Entity sensorSource; + final Sensor<?> sensor; + SubscriptionHandle subscription; private boolean loggedPreviousException = false; - + PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period) { + this(job, handler, period, null, null); + } + + PollJob(final Callable<V> job, final PollHandler<? super V> handler, Duration period, Entity sensorSource, Sensor<?> sensor) { this.handler = handler; this.pollPeriod = period; + this.sensorSource = sensorSource; + this.sensor = sensor; wrappedJob = new Runnable() { @Override @@ -122,11 +137,12 @@ public class Poller<V> { pollJobs.add(foo); } + public void subscribe(Callable<V> job, PollHandler<? super V> handler, Entity sensorSource, Sensor<?> sensor) { + pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor)); + } + @SuppressWarnings({ "unchecked" }) public void start() { - // TODO Previous incarnation of this logged this logged polledSensors.keySet(), but we don't know that anymore - // Is that ok, are can we do better? - if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this}); if (started) { throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", @@ -141,26 +157,32 @@ public class Poller<V> { } Duration minPeriod = null; + Set<String> sensors = MutableSet.of(); for (final PollJob<V> pollJob : pollJobs) { - final String scheduleName = pollJob.handler.getDescription(); - if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) { - ScheduledTask t = ScheduledTask.builder(() -> { - DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), - new Callable<Void>() { @Override public Void call() { - if (!Entities.isManagedActive(entity)) { - return null; - } - if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { - return null; - } - pollJob.wrappedJob.run(); - return null; - } } ); - // explicitly make non-transient -- we want to see its execution, even if parent is transient - BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG); - return task; - }) - .displayName("scheduled:" + scheduleName) + final String scheduleName = (feed!=null ? feed.getDisplayName()+", " : "") +pollJob.handler.getDescription(); + boolean added = false; + + Callable<Task<?>> tf = () -> { + DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), + new Callable<Void>() { @Override public Void call() { + if (!Entities.isManagedActive(entity)) { + return null; + } + if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { + return null; + } + pollJob.wrappedJob.run(); + return null; + } } ); + // explicitly make non-transient -- we want to see its execution, even if parent is transient + BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG); + return task; + }; + + if (pollJob.pollPeriod!=null && pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) { + added =true; + ScheduledTask t = ScheduledTask.builder(tf) + .displayName("Periodic: " + scheduleName) .period(pollJob.pollPeriod) .cancelOnException(false) .tag(feed!=null ? BrooklynTaskTags.tagForContextAdjunct(feed) : null) @@ -169,13 +191,43 @@ public class Poller<V> { if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) { minPeriod = pollJob.pollPeriod; } - } else { - if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this}); + } + + if (pollJob.sensor!=null) { + added = true; + if (pollJob.subscription!=null) { + throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already has subscription %s", + this, entity, pollJob.subscription)); + } + sensors.add(pollJob.sensor.getName()); + pollJob.subscription = feed.subscriptions().subscribe(pollJob.sensorSource!=null ? pollJob.sensorSource : feed.getEntity(), pollJob.sensor, event -> { + // submit this on every event + try { + feed.getExecutionContext().submit(tf.call()); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + }); + } + + if (!added) { + if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as period {} and no subscriptions) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this}); } } - if (minPeriod!=null && feed!=null) { - feed.highlightTriggerPeriod(minPeriod); + if (feed!=null) { + if (sensors.isEmpty()) { + if (minPeriod==null) { + feed.highlightTriggers("Not configured with a period or triggers"); + } else { + feed.highlightTriggerPeriod(minPeriod); + } + } else if (minPeriod==null) { + feed.highlightTriggers("Triggered by: "+sensors); + } else { + // both + feed.highlightTriggers("Running every "+minPeriod+" and on triggers: "+sensors); + } } } @@ -193,6 +245,12 @@ public class Poller<V> { for (ScheduledTask task : tasks) { if (task != null) task.cancel(); } + for (PollJob<?> j: pollJobs) { + if (j.subscription!=null) { + feed.subscriptions().unsubscribe(j.subscription); + j.subscription = null; + } + } oneOffTasks.clear(); tasks.clear(); } @@ -205,10 +263,14 @@ public class Poller<V> { break; } } + boolean hasSubscriptions = pollJobs.stream().anyMatch(j -> j.subscription!=null); if (!started && hasActiveTasks) { log.warn("Poller should not be running, but has active tasks, tasks: "+tasks); } - return started && hasActiveTasks; + if (!started && hasSubscriptions) { + log.warn("Poller should not be running, but has subscriptions on jobs: "+pollJobs); + } + return started && (hasActiveTasks || hasSubscriptions); } protected boolean isEmpty() { diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java index d7cf001235..02dd58e0ff 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java @@ -18,14 +18,30 @@ */ package org.apache.brooklyn.core.sensor; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Predicates; +import com.google.common.reflect.TypeToken; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.effector.AddSensor; import org.apache.brooklyn.core.effector.AddSensorInitializer; +import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.entity.EntityPredicates; +import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser; import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.time.Duration; import com.google.common.annotations.Beta; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; /** * Super-class for entity initializers that add feeds. @@ -55,4 +71,5 @@ public abstract class AbstractAddSensorFeed<T> extends AddSensorInitializer<T> { public AbstractAddSensorFeed(final ConfigBag params) { super(params); } + } diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java new file mode 100644 index 0000000000..b443747356 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.base.Predicates; +import com.google.common.reflect.TypeToken; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.BasicConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.AddSensorInitializer; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.entity.EntityInitializers; +import org.apache.brooklyn.core.entity.EntityPredicates; +import org.apache.brooklyn.core.feed.*; +import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser; +import org.apache.brooklyn.feed.http.HttpPollConfig; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.http.HttpToolResponse; +import org.apache.brooklyn.util.javalang.AtomicReferences; +import org.apache.brooklyn.util.time.Duration; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; + +import static com.fasterxml.jackson.databind.type.LogicalType.Collection; + +/** + * Super-class for entity initializers that add feeds. + */ +@Beta +public abstract class AbstractAddTriggerableSensor<T> extends AbstractAddSensorFeed<T> { + + public static final ConfigKey<Object> SENSOR_TRIGGERS = ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers", + "Sensors which should trigger this feed, supplied with list of maps containing sensor (name or sensor instance) and entity (ID or entity instance), or just sensor names or just one sensor"); + + protected AbstractAddTriggerableSensor() {} + public AbstractAddTriggerableSensor(ConfigBag parameters) { + super(parameters); + } + + public static <V> void scheduleWithTriggers(AbstractFeed feed, Poller<V> poller, Callable<V> pollJob, PollHandler<V> handler, long minPeriod, Set<? extends PollConfig> configs) { + // the logic for feeds with pollers is unncessarily convoluted; for now we try to standardize by routing calls that take other triggers + // through this method; would be nice to clean up (but a big job) + + if (minPeriod>0 && minPeriod < Duration.PRACTICALLY_FOREVER.toMilliseconds()) { + poller.scheduleAtFixedRate(pollJob, handler, minPeriod); + } + for (PollConfig pc: configs) { + if (pc.getOtherTriggers()!=null) { + List<Pair<Entity, Sensor>> triggersResolved = resolveTriggers(feed.getEntity(), pc.getOtherTriggers()); + triggersResolved.forEach(pair -> { + poller.subscribe(pollJob, handler, pair.getLeft(), pair.getRight()); + }); + } + } + } + + @JsonIgnore + protected Duration getPeriod(Entity context, ConfigBag config) { + if (config.containsKey(SENSOR_PERIOD) || !hasTriggers(config)) { + if (context!=null) return Tasks.resolving(config, SENSOR_PERIOD).context(context).immediately(true).get(); + else return config.get(SENSOR_PERIOD); + } + return Duration.PRACTICALLY_FOREVER; + } + + @JsonIgnore + protected Maybe<Object> getTriggersMaybe(Entity context, ConfigBag config) { + return Tasks.resolving(config, SENSOR_TRIGGERS).context(context).deep().immediately(true).getMaybe(); + } + + static List<Pair<Entity,Sensor>> resolveTriggers(Entity context, Object otherTriggers) { + Object triggers = Tasks.resolving(otherTriggers, Object.class).context(context).deep().immediately(true).get(); + + if (triggers==null || (triggers instanceof Collection && ((Collection)triggers).isEmpty())) return Collections.emptyList(); + if (triggers instanceof String) { + SensorFeedTrigger t = new SensorFeedTrigger(); + t.sensorName = (String)triggers; + triggers = MutableList.of(t); + } + if (!(triggers instanceof Collection)) { + throw new IllegalStateException("Triggers should be a list containing sensors or sensor names"); + } + + return ((Collection<?>)triggers).stream().map(ti -> { + SensorFeedTrigger t; + + if (ti instanceof SensorFeedTrigger) { + t = (SensorFeedTrigger) ti; + } else { + if (ti instanceof Map) { + t = Tasks.resolving(ti, SensorFeedTrigger.class).context(context).deep().get(); + } else if (ti instanceof String) { + t = new SensorFeedTrigger(); + t.sensorName = (String) ti; + } else { + throw new IllegalStateException("Trigger should be a map specifyin entity and sensor"); + } + } + + Entity entity = t.entity; + if (entity==null && t.entityId!=null) { + String desiredComponentId = t.entityId; + List<Entity> firstGroupOfMatches = AppGroupTraverser.findFirstGroupOfMatches(context, + Predicates.and(EntityPredicates.configEqualTo(BrooklynConfigKeys.PLAN_ID, desiredComponentId), x->true)::apply); + if (firstGroupOfMatches.isEmpty()) { + firstGroupOfMatches = AppGroupTraverser.findFirstGroupOfMatches(context, + Predicates.and(EntityPredicates.idEqualTo(desiredComponentId), x->true)::apply); + } + if (!firstGroupOfMatches.isEmpty()) { + entity = firstGroupOfMatches.get(0); + } else { + throw new IllegalStateException("Cannot find entity with ID '"+desiredComponentId+"'"); + } + } else { + entity = context; + } + + Sensor sensor = t.sensor; + if (sensor==null) { + if (t.sensorName!=null) { + sensor = entity.getEntityType().getSensor(t.sensorName); + if (sensor==null) sensor = Sensors.newSensor(Object.class, t.sensorName); + } else { + throw new IllegalStateException("Sensor is required for a trigger"); + } + } + return Pair.of(entity, sensor); + }).collect(Collectors.toList()); + } + + protected boolean hasTriggers(ConfigBag config) { + Maybe<Object> triggers = getTriggersMaybe(null, config); + if (triggers==null || triggers.isAbsent()) return false; + if (triggers.get() instanceof Collection && ((Collection)triggers.get()).isEmpty()) return false; + return true; + } + + public static class SensorFeedTrigger { + Entity entity; + @JsonIgnore + String entityId; + Sensor<?> sensor; + @JsonIgnore + String sensorName; + + // TODO could support predicates on the value + + public void setEntity(Entity entity) { + this.entity = entity; + } + public void setEntity(String entityId) { + this.entityId = entityId; + } + public Object getEntity() { + return entity!=null ? entity : entityId; + } + + public void setSensor(Sensor<?> sensor) { + this.sensor = sensor; + } + public void setSensor(String sensorName) { + this.sensorName = sensorName; + } + public Object getSensor() { + return sensor!=null ? sensor : sensorName; + } + } + + + protected void standardPollConfig(Entity entity, ConfigBag configBag, PollConfig<?,?,?> poll) { + final Boolean suppressDuplicates = EntityInitializers.resolve(configBag, SUPPRESS_DUPLICATES); + final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME_ON_STARTUP); + final Duration logWarningGraceTime = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME); + + poll.suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates)) + .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup) + .logWarningGraceTime(logWarningGraceTime) + .period(getPeriod(entity, initParams())) + .otherTriggers(getTriggersMaybe(entity, configBag).orNull()); + } + +} diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java index 646ca82d22..e078eaffbe 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java @@ -29,6 +29,7 @@ import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.config.MapConfigKey; import org.apache.brooklyn.core.entity.EntityInitializers; import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed; +import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor; import org.apache.brooklyn.feed.http.HttpFeed; import org.apache.brooklyn.feed.http.HttpPollConfig; @@ -58,7 +59,7 @@ import net.minidev.json.JSONObject; * @see SshCommandSensor */ @Beta -public class HttpRequestSensor<T> extends AbstractAddSensorFeed<T> { +public class HttpRequestSensor<T> extends AbstractAddTriggerableSensor<T> { private static final Logger LOG = LoggerFactory.getLogger(HttpRequestSensor.class); @@ -126,11 +127,9 @@ public class HttpRequestSensor<T> extends AbstractAddSensorFeed<T> { HttpPollConfig<T> pollConfig = new HttpPollConfig<T>(sensor) .checkSuccess(HttpValueFunctions.responseCodeEquals(200)) .onFailureOrException(Functions.constant((T) null)) - .onSuccess(successFunction) - .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates)) - .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup) - .logWarningGraceTime(logWarningGraceTime) - .period(initParam(SENSOR_PERIOD)); + .onSuccess(successFunction); + + standardPollConfig(entity, initParams(), pollConfig); HttpFeed.Builder httpRequestBuilder = HttpFeed.builder().entity(entity) .baseUri(uri) diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java index cdc9e1d083..cdc77112ad 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java @@ -39,6 +39,7 @@ import org.apache.brooklyn.core.entity.EntityInitializers; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.location.Locations; import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed; +import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.core.sensor.http.HttpRequestSensor; import org.apache.brooklyn.feed.CommandPollConfig; import org.apache.brooklyn.feed.ssh.SshFeed; @@ -77,7 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * @see HttpRequestSensor */ @Beta -public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> { +public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> { private static final Logger LOG = LoggerFactory.getLogger(SshCommandSensor.class); @@ -150,24 +151,17 @@ public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> { LOG.debug("Adding SSH sensor {} to {}", name, entity); } - final Boolean suppressDuplicates = EntityInitializers.resolve(params, SUPPRESS_DUPLICATES); - final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME_ON_STARTUP); - final Duration logWarningGraceTime = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME); - Supplier<Map<String,String>> envSupplier = new EnvSupplier(entity, params); - Supplier<String> commandSupplier = new CommandSupplier(entity, params); CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor) - .period(initParam(SENSOR_PERIOD)) .env(envSupplier) .command(commandSupplier) - .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates)) .checkSuccess(SshValueFunctions.exitStatusEquals(0)) .onFailureOrException(Functions.constant((T)params.get(VALUE_ON_ERROR))) - .onSuccess(Functionals.chain(SshValueFunctions.stdout(), new CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT)))) - .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup) - .logWarningGraceTime(logWarningGraceTime); + .onSuccess(Functionals.chain(SshValueFunctions.stdout(), new CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT)))); + + standardPollConfig(entity, initParams(), pollConfig); SshFeed feed = SshFeed.builder() .entity(entity) diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java index 47db417054..69b8ca5ade 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java @@ -276,6 +276,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus private void connectAllMembersUp() { clusterOneAndAllMembersUp = FunctionFeed.builder() + .uniqueTag("one-and-all-members-up") .entity(this) .period(Duration.FIVE_SECONDS) .poll(new FunctionPollConfig<Boolean, Boolean>(CLUSTER_ONE_AND_ALL_MEMBERS_UP) diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java index a9d192ef71..7b187df98d 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java @@ -127,6 +127,7 @@ public class DynamicMultiGroupImpl extends DynamicGroupImpl implements DynamicMu Long interval = getConfig(RESCAN_INTERVAL); if (interval != null && interval > 0L) { rescan = FunctionFeed.builder() + .uniqueTag("dynamic-multi-group-scanner") .entity(this) .poll(new FunctionPollConfig<Object, Void>(RESCAN) .period(interval, TimeUnit.SECONDS) diff --git a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java index 95fe7ac82a..79b2b4dd34 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java @@ -37,6 +37,7 @@ import org.apache.brooklyn.core.feed.AttributePollHandler; import org.apache.brooklyn.core.feed.DelegatingPollHandler; import org.apache.brooklyn.core.feed.Poller; import org.apache.brooklyn.core.location.Locations; +import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.feed.ssh.SshPollValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,15 +238,12 @@ public abstract class AbstractCommandFeed extends AbstractFeed { handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this)); if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); } - - getPoller().scheduleAtFixedRate( - new Callable<SshPollValue>() { - @Override - public SshPollValue call() throws Exception { - return exec(pollInfo.command.get(), pollInfo.env.get()); - }}, - new DelegatingPollHandler<SshPollValue>(handlers), - minPeriod); + + AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), new Callable<SshPollValue>() { + @Override + public SshPollValue call() throws Exception { + return exec(pollInfo.command.get(), pollInfo.env.get()); + }}, new DelegatingPollHandler(handlers), minPeriod, configs); } } diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java index 962c2c0638..f9f2a1867a 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java @@ -32,6 +32,8 @@ import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.feed.AbstractFeed; import org.apache.brooklyn.core.feed.AttributePollHandler; import org.apache.brooklyn.core.feed.DelegatingPollHandler; +import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; +import org.apache.brooklyn.util.http.HttpToolResponse; import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,6 +104,7 @@ public class FunctionFeed extends AbstractFeed { private long period = 500; private TimeUnit periodUnits = TimeUnit.MILLISECONDS; private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList(); + private String name; private String uniqueTag; private volatile boolean built; @@ -129,6 +132,10 @@ public class FunctionFeed extends AbstractFeed { polls.add(config); return this; } + public Builder name(String name) { + this.name = name; + return this; + } public Builder uniqueTag(String uniqueTag) { this.uniqueTag = uniqueTag; return this; @@ -182,6 +189,10 @@ public class FunctionFeed extends AbstractFeed { Callable<?> job = config.getCallable(); polls.put(new FunctionPollIdentifier(job), configCopy); } + + if (builder.name!=null) setDisplayName(builder.name); + else if (builder.uniqueTag!=null) setDisplayName(builder.uniqueTag); + config().set(POLLS, polls); initUniqueTag(builder.uniqueTag, polls.values()); } @@ -199,11 +210,8 @@ public class FunctionFeed extends AbstractFeed { handlers.add(new AttributePollHandler(config, entity, this)); if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); } - - getPoller().scheduleAtFixedRate( - (Callable)pollInfo.job, - new DelegatingPollHandler(handlers), - minPeriod); + + AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), (Callable)pollInfo.job, new DelegatingPollHandler(handlers), minPeriod, configs); } } } diff --git a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java index ffe690af44..95a11a2a24 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java +++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java @@ -34,6 +34,7 @@ import org.apache.brooklyn.util.javalang.JavaClassNames; public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfig<S, T>> { + private String name; private Callable<?> callable; public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> sensor) { @@ -51,12 +52,18 @@ public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfi public FunctionPollConfig(FunctionPollConfig<S, T> other) { super(other); callable = other.callable; + name = other.name; } public Callable<? extends Object> getCallable() { return callable; } - + + public FunctionPollConfig<S, T> name(String name) { + this.name = name; + return this; + } + /** * The {@link Callable} to be invoked on each poll. * <p> @@ -108,6 +115,7 @@ public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfi @Override protected String toStringBaseName() { return "fn"; } @Override protected String toStringPollSource() { + if (name!=null) return name; if (callable==null) return null; String cs = callable.toString(); if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) { diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java index 2f0247bc8b..4d1a13ffa1 100644 --- a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java +++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java @@ -37,6 +37,7 @@ import org.apache.brooklyn.core.feed.Poller; import org.apache.brooklyn.core.location.Locations; import org.apache.brooklyn.core.location.Machines; import org.apache.brooklyn.core.location.internal.LocationInternal; +import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.util.core.javalang.BrooklynHttpConfig; import org.apache.brooklyn.util.executor.HttpExecutorFactory; import org.apache.brooklyn.util.guava.Maybe; @@ -416,8 +417,10 @@ public class HttpFeed extends AbstractFeed { .config(BrooklynHttpConfig.httpConfigBuilder(getEntity()).build()) .build()); return createHttpToolRespose(response, startTime); - }}; - getPoller().scheduleAtFixedRate(pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod); + } + }; + + AbstractAddTriggerableSensor.scheduleWithTriggers(this, getPoller(), pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod, configs); } } diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java index 623e8f4090..faaff23662 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java @@ -291,6 +291,7 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft protected void connectServiceUpIsRunning() { Duration period = config().get(SERVICE_PROCESS_IS_RUNNING_POLL_PERIOD); serviceProcessIsRunning = FunctionFeed.builder() + .uniqueTag("check-service-process-is-running") .entity(this) .period(period) .poll(new FunctionPollConfig<Boolean, Boolean>(SERVICE_PROCESS_IS_RUNNING) diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java index 130c25b963..474761c724 100644 --- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java +++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java @@ -22,7 +22,9 @@ import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.config.MapConfigKey; import org.apache.brooklyn.core.config.SetConfigKey; +import org.apache.brooklyn.core.entity.BrooklynConfigKeys; import org.apache.brooklyn.util.time.Duration; import java.util.List; @@ -42,6 +44,8 @@ public interface ContainerCommons { ConfigKey<List> COMMAND = ConfigKeys.newConfigKey(List.class,"command", "Single command and optional arguments to execute for the container (overrides image EntryPoint and Cmd)", Lists.newArrayList()); ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args", "Additional arguments to pass to the command at the container (in addition to the command supplied here or the default in the image)", Lists.newArrayList()); + MapConfigKey<Object> SHELL_ENVIRONMENT = BrooklynConfigKeys.SHELL_ENVIRONMENT; + ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "timeout", "Container execution timeout (default 5 minutes)", Duration.minutes(5)); ConfigKey<Boolean> REQUIRE_EXIT_CODE_ZERO = ConfigKeys.newConfigKey(Boolean.class, "requireExitCodeZero", "Whether task should fail if container returns non-zero exit code (default true)", true); diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java index d35cac56d9..dbcf071622 100644 --- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java +++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java @@ -18,29 +18,29 @@ */ package org.apache.brooklyn.tasks.kubectl; +import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.entity.EntityInitializers; import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed; +import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor; import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.feed.function.FunctionPollConfig; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.task.DynamicTasks; -import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import static org.apache.brooklyn.core.mgmt.BrooklynTaskTags.SENSOR_TAG; @SuppressWarnings({"UnstableApiUsage", "deprecation", "unchecked"}) -public class ContainerSensor<T> extends AbstractAddSensorFeed<T> implements ContainerCommons { +public class ContainerSensor<T> extends AbstractAddTriggerableSensor<T> implements ContainerCommons { public static final ConfigKey<String> FORMAT = SshCommandSensor.FORMAT; public static final ConfigKey<Boolean> LAST_YAML_DOCUMENT = SshCommandSensor.LAST_YAML_DOCUMENT; @@ -63,34 +63,42 @@ public class ContainerSensor<T> extends AbstractAddSensorFeed<T> implements Cont ConfigBag configBag = ConfigBag.newInstanceCopying(initParams()); - final Boolean suppressDuplicates = EntityInitializers.resolve(configBag, SUPPRESS_DUPLICATES); - final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME_ON_STARTUP); - final Duration logWarningGraceTime = EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME); + FunctionPollConfig<Object, String> poll = new FunctionPollConfig<>(sensor) + .callable(new ContainerSensorCallable(entity, configBag, sensor)); + standardPollConfig(entity, configBag, poll); - ((EntityInternal)entity).feeds().add(FunctionFeed.builder() + ((EntityInternal) entity).feeds().add(FunctionFeed.builder() .entity(entity) - .period(initParam(SENSOR_PERIOD)) .onlyIfServiceUp() - .poll(new FunctionPollConfig<>(sensor) - .callable(new Callable<Object>() { - @Override - public Object call() throws Exception { - Task<ContainerTaskResult> containerTask = ContainerTaskFactory.newInstance() - .summary("Running " + EntityInitializers.resolve(configBag, SENSOR_NAME)) - .jobIdentifier(entity.getApplication()+"-"+entity.getId() + "-" + SENSOR_TAG) - .configure(configBag.getAllConfig()) - .newTask(); - DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity); - String mainStdout = containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES)).getMainStdout(); - return (new SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(mainStdout); - } - }) - .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates)) - .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup) - .logWarningGraceTime(logWarningGraceTime)) + .poll(poll) .build()); } + public static class ContainerSensorCallable implements Callable<Object> { + private final Entity entity; + private final ConfigBag configBag; + private final Sensor<?> sensor; -} + public ContainerSensorCallable(Entity entity, ConfigBag configBag, Sensor<?> sensor) { + this.entity = entity; + this.configBag = configBag; + this.sensor = sensor; + } + public Object call() throws Exception { + Task<ContainerTaskResult> containerTask = ContainerTaskFactory.newInstance() + .summary("Running " + EntityInitializers.resolve(configBag, SENSOR_NAME)) + .jobIdentifier(entity.getApplication() + "-" + entity.getId() + "-" + SENSOR_TAG) + .configure(configBag.getAllConfig()) + .newTask(); + DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity); + String mainStdout = containerTask.getUnchecked(configBag.get(TIMEOUT)).getMainStdout(); + return (new SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), configBag.get(FORMAT), configBag.get(LAST_YAML_DOCUMENT))).apply(mainStdout); + } + @Override + public String toString() { + return "container-sensor[" + configBag.get(CONTAINER_IMAGE) + "]"; + } + } + +} diff --git a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java index 84cb436b4b..310888cf08 100644 --- a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java +++ b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java @@ -23,7 +23,6 @@ import com.google.gson.Gson; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.TaskAdaptable; -import org.apache.brooklyn.core.entity.BrooklynConfigKeys; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityInitializers; import org.apache.brooklyn.core.entity.EntityInternal; @@ -124,7 +123,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp LOG.debug("Submitting container job in namespace "+namespace+", name "+kubeJobName); - Map<String, String> env = new ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config, BrooklynConfigKeys.SHELL_ENVIRONMENT)); + Map<String, String> env = new ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config, SHELL_ENVIRONMENT)); final BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> jobYaml = new KubeJobFileCreator() .withImage(containerImage) .withImagePullPolicy(containerImagePullPolicy) @@ -578,7 +577,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp return null; } - LOG.info("Deleting namespace " + namespace); + LOG.debug("Deleting namespace " + namespace); // do this not as a subtask so we can run even if the main queue fails ProcessTaskFactory<String> tf = newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, namespace)).summary("Tear down containers").allowingNonZeroExitCode(); if (!requireSuccess) tf = tf.allowingNonZeroExitCode(); @@ -611,7 +610,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp return environmentVariablesRaw(map); } public T environmentVariablesRaw(Map<String,?> map) { - config.put(BrooklynConfigKeys.SHELL_ENVIRONMENT, MutableMap.copyOf( map ) ); + config.put(SHELL_ENVIRONMENT, MutableMap.copyOf( map ) ); return self(); } @@ -620,7 +619,7 @@ public class ContainerTaskFactory<T extends ContainerTaskFactory<T,RET>,RET> imp return this.environmentVariableRaw(key, (Object)val); } public T environmentVariableRaw(String key, Object val) { - return environmentVariablesRaw(MutableMap.copyOf( config.get(BrooklynConfigKeys.SHELL_ENVIRONMENT) ).add(key, val)); + return environmentVariablesRaw(MutableMap.copyOf( config.get(SHELL_ENVIRONMENT) ).add(key, val)); } @Override diff --git a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java index 5de54807d3..679162d49e 100644 --- a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java @@ -21,13 +21,20 @@ package org.apache.brooklyn.tasks.kubectl; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.Dumper; import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.sensor.DependentConfiguration; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; import org.testng.annotations.Test; @SuppressWarnings( "UnstableApiUsage") @@ -123,4 +130,30 @@ public class ContainerSensorTest extends BrooklynAppUnitTestSupport { EntityAsserts.assertAttributeEqualsEventually(parentEntity, Attributes.SERVICE_UP, true); EntityAsserts.assertAttributeEventually(parentEntity, Sensors.newStringSensor("tf-version-sensor"), s -> s.contains("Terraform")); } + + @Test + public void testTriggeredContainerSensor() { + AttributeSensor<Object> trigger = Sensors.newSensor(Object.class, "the-trigger"); + AttributeSensor<Object> triggered = Sensors.newSensor(Object.class, "triggered"); + ConfigBag parameters = ConfigBag.newInstance(MutableMap.of( + ContainerCommons.CONTAINER_IMAGE, "stedolan/jq", + ContainerCommons.CONTAINER_IMAGE_PULL_POLICY, PullPolicy.IF_NOT_PRESENT, + ContainerCommons.SHELL_ENVIRONMENT, MutableMap.of("LAST_TRIGGER", DependentConfiguration.attributeWhenReady(app, trigger)), + ContainerCommons.BASH_SCRIPT, ImmutableList.of("echo " + "$LAST_TRIGGER" + " | jq .value"), + ContainerSensor.SENSOR_TRIGGERS, MutableList.of(MutableMap.of("entity", app.getId(), "sensor", "the-trigger")), + ContainerSensor.SENSOR_NAME, "triggered")); + + ContainerSensor<String> initializer = new ContainerSensor<>(parameters); + TestEntity child = app.createAndManageChild(EntitySpec.create(TestEntity.class).addInitializer(initializer)); + app.start(ImmutableList.of()); + + EntityAsserts.assertAttributeEquals(child, triggered, null); + app.sensors().set(trigger, "{ \"name\": \"bob\", \"value\": 3 }"); + + Time.sleep(Duration.ONE_SECOND); + Dumper.dumpInfo(app); + + EntityAsserts.assertAttributeEventuallyNonNull(child, triggered); + EntityAsserts.assertAttributeEquals(child, triggered, "3"); + } } diff --git a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java index ca18b25048..08d9a2ec03 100644 --- a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java +++ b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java @@ -30,6 +30,7 @@ import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.EntityInitializers; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed; +import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.core.sensor.http.HttpRequestSensor; import org.apache.brooklyn.feed.CommandPollConfig; import org.apache.brooklyn.feed.ssh.SshValueFunctions; @@ -63,7 +64,7 @@ import com.google.common.reflect.TypeToken; * @see HttpRequestSensor */ @Beta -public final class WinRmCommandSensor<T> extends AbstractAddSensorFeed<T> { +public final class WinRmCommandSensor<T> extends AbstractAddTriggerableSensor<T> { private static final Logger LOG = LoggerFactory.getLogger(WinRmCommandSensor.class); @@ -90,10 +91,6 @@ public final class WinRmCommandSensor<T> extends AbstractAddSensorFeed<T> { LOG.debug("Adding WinRM sensor {} to {}", name, entity); } - final Boolean suppressDuplicates = EntityInitializers.resolve(initParams(), SUPPRESS_DUPLICATES); - final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(initParams(), LOG_WARNING_GRACE_TIME_ON_STARTUP); - final Duration logWarningGraceTime = EntityInitializers.resolve(initParams(), LOG_WARNING_GRACE_TIME); - Supplier<Map<String,String>> envSupplier = new Supplier<Map<String,String>>() { @SuppressWarnings("serial") @Override @@ -127,16 +124,15 @@ public final class WinRmCommandSensor<T> extends AbstractAddSensorFeed<T> { .period(initParam(SENSOR_PERIOD)) .env(envSupplier) .command(commandSupplier) - .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates)) .checkSuccess(SshValueFunctions.exitStatusEquals(0)) .onFailureOrException(Functions.constant((T) null)) .onSuccess(Functions.compose(new Function<String, T>() { @Override public T apply(String input) { return TypeCoercions.coerce(Strings.trimEnd(input), (Class<T>) sensor.getType()); - }}, SshValueFunctions.stdout())) - .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup) - .logWarningGraceTime(logWarningGraceTime); + }}, SshValueFunctions.stdout())); + + standardPollConfig(entity, initParams(), pollConfig); CmdFeed feed = CmdFeed.builder() .entity(entity)
