This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 86e75d683ec25c505ce63ae270b02f5693d93e59
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue Aug 16 12:05:04 2022 +0100

    allow common feeds to be triggered from sensors
    
    also improve names/display/unique-tag of some common sensors
---
 .../apache/brooklyn/core/feed/AbstractFeed.java    |   7 +-
 .../brooklyn/core/feed/AttributePollHandler.java   |   2 +-
 .../org/apache/brooklyn/core/feed/PollConfig.java  |  19 +-
 .../java/org/apache/brooklyn/core/feed/Poller.java | 118 +++++++++---
 .../core/sensor/AbstractAddSensorFeed.java         |  17 ++
 .../core/sensor/AbstractAddTriggerableSensor.java  | 213 +++++++++++++++++++++
 .../core/sensor/http/HttpRequestSensor.java        |  11 +-
 .../brooklyn/core/sensor/ssh/SshCommandSensor.java |  16 +-
 .../brooklyn/entity/group/DynamicClusterImpl.java  |   1 +
 .../entity/group/DynamicMultiGroupImpl.java        |   1 +
 .../apache/brooklyn/feed/AbstractCommandFeed.java  |  16 +-
 .../brooklyn/feed/function/FunctionFeed.java       |  18 +-
 .../brooklyn/feed/function/FunctionPollConfig.java |  10 +-
 .../org/apache/brooklyn/feed/http/HttpFeed.java    |   7 +-
 .../entity/software/base/SoftwareProcessImpl.java  |   1 +
 .../brooklyn/tasks/kubectl/ContainerCommons.java   |   4 +
 .../brooklyn/tasks/kubectl/ContainerSensor.java    |  62 +++---
 .../tasks/kubectl/ContainerTaskFactory.java        |   9 +-
 .../tasks/kubectl/ContainerSensorTest.java         |  33 ++++
 .../core/sensor/windows/WinRmCommandSensor.java    |  14 +-
 20 files changed, 471 insertions(+), 108 deletions(-)

diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java 
b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
index 70ef397ac1..7a50fff4db 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AbstractFeed.java
@@ -201,7 +201,6 @@ public abstract class AbstractFeed extends 
AbstractEntityAdjunct implements Feed
     
     @Override
     protected void onChanged() {
-        // TODO Auto-generated method stub
     }
 
     /**
@@ -209,7 +208,7 @@ public abstract class AbstractFeed extends 
AbstractEntityAdjunct implements Feed
      */
     protected void preStart() {
     }
-    
+
     /**
      * For overriding.
      */
@@ -233,6 +232,10 @@ public abstract class AbstractFeed extends 
AbstractEntityAdjunct implements Feed
         highlightTriggers("Running every "+minPeriod);
     }
 
+    public void highlightTriggers(String message) {
+        super.highlightTriggers(message);
+    }
+
     void onRemoveSensor(Sensor<?> sensor) {
         highlightActionPublishSensor("Clear sensor "+sensor.getName());
     }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java 
b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
index c59916984d..84e4d852ba 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/AttributePollHandler.java
@@ -233,7 +233,7 @@ public class AttributePollHandler<V> implements 
PollHandler<V> {
     
     @Override
     public String getDescription() {
-        return sensor.getName()+" @ "+entity.getId()+" <- "+config;
+        return sensor.getName() + " " /*+entity.getId()*/ +" <- " + config;
     }
     
     protected String getBriefDescription() {
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java 
b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
index d9990538e5..7c8144811d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/PollConfig.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.util.time.Duration;
 public class PollConfig<V, T, F extends PollConfig<V, T, F>> extends 
FeedConfig<V, T, F> {
 
     private long period = -1;
+    private Object otherTriggers;
     private String description;
 
     public PollConfig(AttributeSensor<T> sensor) {
@@ -43,6 +44,8 @@ public class PollConfig<V, T, F extends PollConfig<V, T, F>> 
extends FeedConfig<
     public PollConfig(PollConfig<V,T,F> other) {
         super(other);
         this.period = other.period;
+        this.otherTriggers = other.otherTriggers;
+        this.description = other.description;
     }
 
     public long getPeriod() {
@@ -69,13 +72,25 @@ public class PollConfig<V, T, F extends PollConfig<V, T, 
F>> extends FeedConfig<
         this.description = description;
         return self();
     }
-    
+
+    public F otherTriggers(Object otherTriggers) {
+        this.otherTriggers = otherTriggers;
+        return self();
+    }
+
+    public Object getOtherTriggers() {
+        return otherTriggers;
+    }
+
     public String getDescription() {
         return description;
     }
     
     @Override protected MutableList<Object> toStringOtherFields() {
-        return super.toStringOtherFields().appendIfNotNull(description);
+        MutableList<Object> result = 
super.toStringOtherFields().appendIfNotNull(description);
+        if (period>0 && period <= 
Duration.PRACTICALLY_FOREVER.toMilliseconds()) result.append("period: 
"+Duration.of(period));
+        if (otherTriggers!=null) result.append("triggers: "+otherTriggers);
+        return result;
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java 
b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 6049ecf0e5..997c34fcf3 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -23,14 +23,20 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 
 import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
 import org.apache.brooklyn.util.core.task.ScheduledTask;
 import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,11 +67,20 @@ public class Poller<V> {
         final PollHandler<? super V> handler;
         final Duration pollPeriod;
         final Runnable wrappedJob;
+        final Entity sensorSource;
+        final Sensor<?> sensor;
+        SubscriptionHandle subscription;
         private boolean loggedPreviousException = false;
-        
+
         PollJob(final Callable<V> job, final PollHandler<? super V> handler, 
Duration period) {
+            this(job, handler, period, null, null);
+        }
+
+        PollJob(final Callable<V> job, final PollHandler<? super V> handler, 
Duration period, Entity sensorSource, Sensor<?> sensor) {
             this.handler = handler;
             this.pollPeriod = period;
+            this.sensorSource = sensorSource;
+            this.sensor = sensor;
             
             wrappedJob = new Runnable() {
                 @Override
@@ -122,11 +137,12 @@ public class Poller<V> {
         pollJobs.add(foo);
     }
 
+    public void subscribe(Callable<V> job, PollHandler<? super V> handler, 
Entity sensorSource, Sensor<?> sensor) {
+        pollJobs.add(new PollJob<V>(job, handler, null, sensorSource, sensor));
+    }
+
     @SuppressWarnings({ "unchecked" })
     public void start() {
-        // TODO Previous incarnation of this logged this logged 
polledSensors.keySet(), but we don't know that anymore
-        // Is that ok, are can we do better?
-        
         if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", 
new Object[] {entity, this});
         if (started) { 
             throw new IllegalStateException(String.format("Attempt to start 
poller %s of entity %s when already running", 
@@ -141,26 +157,32 @@ public class Poller<V> {
         }
         
         Duration minPeriod = null;
+        Set<String> sensors = MutableSet.of();
         for (final PollJob<V> pollJob : pollJobs) {
-            final String scheduleName = pollJob.handler.getDescription();
-            if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
-                ScheduledTask t = ScheduledTask.builder(() -> {
-                            DynamicSequentialTask<Void> task = new 
DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, 
"entity", entity), 
-                                new Callable<Void>() { @Override public Void 
call() {
-                                    if (!Entities.isManagedActive(entity)) {
-                                        return null;
-                                    }
-                                    if (onlyIfServiceUp && 
!Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
-                                        return null;
-                                    }
-                                    pollJob.wrappedJob.run();
-                                    return null; 
-                                } } );
-                            // explicitly make non-transient -- we want to see 
its execution, even if parent is transient
-                            BrooklynTaskTags.addTagDynamically(task, 
BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
-                            return task;
-                        })
-                        .displayName("scheduled:" + scheduleName)
+            final String scheduleName = (feed!=null ? feed.getDisplayName()+", 
" : "") +pollJob.handler.getDescription();
+            boolean added = false;
+
+            Callable<Task<?>> tf = () -> {
+                DynamicSequentialTask<Void> task = new 
DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, 
"entity", entity),
+                        new Callable<Void>() { @Override public Void call() {
+                            if (!Entities.isManagedActive(entity)) {
+                                return null;
+                            }
+                            if (onlyIfServiceUp && 
!Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
+                                return null;
+                            }
+                            pollJob.wrappedJob.run();
+                            return null;
+                        } } );
+                // explicitly make non-transient -- we want to see its 
execution, even if parent is transient
+                BrooklynTaskTags.addTagDynamically(task, 
BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
+                return task;
+            };
+
+            if (pollJob.pollPeriod!=null && 
pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
+                added =true;
+                ScheduledTask t = ScheduledTask.builder(tf)
+                        .displayName("Periodic: " + scheduleName)
                         .period(pollJob.pollPeriod)
                         .cancelOnException(false)
                         .tag(feed!=null ? 
BrooklynTaskTags.tagForContextAdjunct(feed) : null)
@@ -169,13 +191,43 @@ public class Poller<V> {
                 if (minPeriod==null || 
(pollJob.pollPeriod.isShorterThan(minPeriod))) {
                     minPeriod = pollJob.pollPeriod;
                 }
-            } else {
-                if (log.isDebugEnabled()) log.debug("Activating poll (but 
leaving off, as period {}) for {} (using {})", new Object[] 
{pollJob.pollPeriod, entity, this});
+            }
+
+            if (pollJob.sensor!=null) {
+                added = true;
+                if (pollJob.subscription!=null) {
+                    throw new IllegalStateException(String.format("Attempt to 
start poller %s of entity %s when already has subscription %s",
+                            this, entity, pollJob.subscription));
+                }
+                sensors.add(pollJob.sensor.getName());
+                pollJob.subscription = 
feed.subscriptions().subscribe(pollJob.sensorSource!=null ? 
pollJob.sensorSource : feed.getEntity(), pollJob.sensor, event -> {
+                    // submit this on every event
+                    try {
+                        feed.getExecutionContext().submit(tf.call());
+                    } catch (Exception e) {
+                        throw Exceptions.propagate(e);
+                    }
+                });
+            }
+
+            if (!added) {
+                if (log.isDebugEnabled()) log.debug("Activating poll (but 
leaving off, as period {} and no subscriptions) for {} (using {})", new 
Object[] {pollJob.pollPeriod, entity, this});
             }
         }
         
-        if (minPeriod!=null && feed!=null) {
-            feed.highlightTriggerPeriod(minPeriod);
+        if (feed!=null) {
+            if (sensors.isEmpty()) {
+                if (minPeriod==null) {
+                    feed.highlightTriggers("Not configured with a period or 
triggers");
+                } else {
+                    feed.highlightTriggerPeriod(minPeriod);
+                }
+            } else if (minPeriod==null) {
+                feed.highlightTriggers("Triggered by: "+sensors);
+            } else {
+                // both
+                feed.highlightTriggers("Running every "+minPeriod+" and on 
triggers: "+sensors);
+            }
         }
     }
     
@@ -193,6 +245,12 @@ public class Poller<V> {
         for (ScheduledTask task : tasks) {
             if (task != null) task.cancel();
         }
+        for (PollJob<?> j: pollJobs) {
+            if (j.subscription!=null) {
+                feed.subscriptions().unsubscribe(j.subscription);
+                j.subscription = null;
+            }
+        }
         oneOffTasks.clear();
         tasks.clear();
     }
@@ -205,10 +263,14 @@ public class Poller<V> {
                 break;
             }
         }
+        boolean hasSubscriptions = pollJobs.stream().anyMatch(j -> 
j.subscription!=null);
         if (!started && hasActiveTasks) {
             log.warn("Poller should not be running, but has active tasks, 
tasks: "+tasks);
         }
-        return started && hasActiveTasks;
+        if (!started && hasSubscriptions) {
+            log.warn("Poller should not be running, but has subscriptions on 
jobs: "+pollJobs);
+        }
+        return started && (hasActiveTasks || hasSubscriptions);
     }
     
     protected boolean isEmpty() {
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java 
b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
index d7cf001235..02dd58e0ff 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddSensorFeed.java
@@ -18,14 +18,30 @@
  */
 package org.apache.brooklyn.core.sensor;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.effector.AddSensor;
 import org.apache.brooklyn.core.effector.AddSensorInitializer;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser;
 import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.guava.Maybe;
 import org.apache.brooklyn.util.time.Duration;
 
 import com.google.common.annotations.Beta;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 /**
  * Super-class for entity initializers that add feeds.
@@ -55,4 +71,5 @@ public abstract class AbstractAddSensorFeed<T> extends 
AddSensorInitializer<T> {
     public AbstractAddSensorFeed(final ConfigBag params) {
         super(params);
     }
+
 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
 
b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
new file mode 100644
index 0000000000..b443747356
--- /dev/null
+++ 
b/core/src/main/java/org/apache/brooklyn/core/sensor/AbstractAddTriggerableSensor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.sensor;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.Sensor;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.BasicConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.effector.AddSensorInitializer;
+import org.apache.brooklyn.core.enricher.AbstractEnricher;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
+import org.apache.brooklyn.core.entity.EntityInitializers;
+import org.apache.brooklyn.core.entity.EntityPredicates;
+import org.apache.brooklyn.core.feed.*;
+import org.apache.brooklyn.core.mgmt.internal.AppGroupTraverser;
+import org.apache.brooklyn.feed.http.HttpPollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.http.HttpToolResponse;
+import org.apache.brooklyn.util.javalang.AtomicReferences;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static com.fasterxml.jackson.databind.type.LogicalType.Collection;
+
+/**
+ * Super-class for entity initializers that add feeds.
+ */
+@Beta
+public abstract class AbstractAddTriggerableSensor<T> extends 
AbstractAddSensorFeed<T> {
+
+    public static final ConfigKey<Object> SENSOR_TRIGGERS = 
ConfigKeys.newConfigKey(new TypeToken<Object>() {}, "triggers",
+            "Sensors which should trigger this feed, supplied with list of 
maps containing sensor (name or sensor instance) and entity (ID or entity 
instance), or just sensor names or just one sensor");
+
+    protected AbstractAddTriggerableSensor() {}
+    public AbstractAddTriggerableSensor(ConfigBag parameters) {
+        super(parameters);
+    }
+
+    public static <V> void scheduleWithTriggers(AbstractFeed feed, Poller<V> 
poller, Callable<V> pollJob, PollHandler<V> handler, long minPeriod, Set<? 
extends PollConfig> configs) {
+        // the logic for feeds with pollers is unncessarily convoluted; for 
now we try to standardize by routing calls that take other triggers
+        // through this method; would be nice to clean up (but a big job)
+
+        if (minPeriod>0 && minPeriod < 
Duration.PRACTICALLY_FOREVER.toMilliseconds()) {
+            poller.scheduleAtFixedRate(pollJob, handler, minPeriod);
+        }
+        for (PollConfig pc: configs) {
+            if (pc.getOtherTriggers()!=null) {
+                List<Pair<Entity, Sensor>> triggersResolved = 
resolveTriggers(feed.getEntity(), pc.getOtherTriggers());
+                triggersResolved.forEach(pair -> {
+                    poller.subscribe(pollJob, handler, pair.getLeft(), 
pair.getRight());
+                });
+            }
+        }
+    }
+
+    @JsonIgnore
+    protected Duration getPeriod(Entity context, ConfigBag config) {
+        if (config.containsKey(SENSOR_PERIOD) || !hasTriggers(config)) {
+            if (context!=null) return Tasks.resolving(config, 
SENSOR_PERIOD).context(context).immediately(true).get();
+            else return config.get(SENSOR_PERIOD);
+        }
+        return Duration.PRACTICALLY_FOREVER;
+    }
+
+    @JsonIgnore
+    protected Maybe<Object> getTriggersMaybe(Entity context, ConfigBag config) 
{
+        return Tasks.resolving(config, 
SENSOR_TRIGGERS).context(context).deep().immediately(true).getMaybe();
+    }
+
+    static List<Pair<Entity,Sensor>> resolveTriggers(Entity context, Object 
otherTriggers) {
+        Object triggers = Tasks.resolving(otherTriggers, 
Object.class).context(context).deep().immediately(true).get();
+
+        if (triggers==null || (triggers instanceof Collection && 
((Collection)triggers).isEmpty())) return Collections.emptyList();
+        if (triggers instanceof String) {
+            SensorFeedTrigger t = new SensorFeedTrigger();
+            t.sensorName = (String)triggers;
+            triggers = MutableList.of(t);
+        }
+        if (!(triggers instanceof Collection)) {
+            throw new IllegalStateException("Triggers should be a list 
containing sensors or sensor names");
+        }
+
+        return ((Collection<?>)triggers).stream().map(ti -> {
+            SensorFeedTrigger t;
+
+            if (ti instanceof SensorFeedTrigger) {
+                t = (SensorFeedTrigger) ti;
+            } else {
+                if (ti instanceof Map) {
+                    t = Tasks.resolving(ti, 
SensorFeedTrigger.class).context(context).deep().get();
+                } else if (ti instanceof String) {
+                    t = new SensorFeedTrigger();
+                    t.sensorName = (String) ti;
+                } else {
+                    throw new IllegalStateException("Trigger should be a map 
specifyin entity and sensor");
+                }
+            }
+
+            Entity entity = t.entity;
+            if (entity==null && t.entityId!=null) {
+                String desiredComponentId = t.entityId;
+                List<Entity> firstGroupOfMatches = 
AppGroupTraverser.findFirstGroupOfMatches(context,
+                        
Predicates.and(EntityPredicates.configEqualTo(BrooklynConfigKeys.PLAN_ID, 
desiredComponentId), x->true)::apply);
+                if (firstGroupOfMatches.isEmpty()) {
+                    firstGroupOfMatches = 
AppGroupTraverser.findFirstGroupOfMatches(context,
+                            
Predicates.and(EntityPredicates.idEqualTo(desiredComponentId), x->true)::apply);
+                }
+                if (!firstGroupOfMatches.isEmpty()) {
+                    entity = firstGroupOfMatches.get(0);
+                } else {
+                    throw new IllegalStateException("Cannot find entity with 
ID '"+desiredComponentId+"'");
+                }
+            } else {
+                entity = context;
+            }
+
+            Sensor sensor = t.sensor;
+            if (sensor==null) {
+                if (t.sensorName!=null) {
+                    sensor = entity.getEntityType().getSensor(t.sensorName);
+                    if (sensor==null) sensor = Sensors.newSensor(Object.class, 
t.sensorName);
+                } else {
+                    throw new IllegalStateException("Sensor is required for a 
trigger");
+                }
+            }
+            return Pair.of(entity, sensor);
+        }).collect(Collectors.toList());
+    }
+
+    protected boolean hasTriggers(ConfigBag config) {
+        Maybe<Object> triggers = getTriggersMaybe(null, config);
+        if (triggers==null || triggers.isAbsent()) return false;
+        if (triggers.get() instanceof Collection && 
((Collection)triggers.get()).isEmpty()) return false;
+        return true;
+    }
+
+    public static class SensorFeedTrigger {
+        Entity entity;
+        @JsonIgnore
+        String entityId;
+        Sensor<?> sensor;
+        @JsonIgnore
+        String sensorName;
+
+        // TODO could support predicates on the value
+
+        public void setEntity(Entity entity) {
+            this.entity = entity;
+        }
+        public void setEntity(String entityId) {
+            this.entityId = entityId;
+        }
+        public Object getEntity() {
+            return entity!=null ? entity : entityId;
+        }
+
+        public void setSensor(Sensor<?> sensor) {
+            this.sensor = sensor;
+        }
+        public void setSensor(String sensorName) {
+            this.sensorName = sensorName;
+        }
+        public Object getSensor() {
+            return sensor!=null ? sensor : sensorName;
+        }
+    }
+
+
+    protected void standardPollConfig(Entity entity, ConfigBag configBag, 
PollConfig<?,?,?> poll) {
+        final Boolean suppressDuplicates = 
EntityInitializers.resolve(configBag, SUPPRESS_DUPLICATES);
+        final Duration logWarningGraceTimeOnStartup = 
EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME_ON_STARTUP);
+        final Duration logWarningGraceTime = 
EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME);
+
+        poll.suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
+                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
+                .logWarningGraceTime(logWarningGraceTime)
+                .period(getPeriod(entity, initParams()))
+                .otherTriggers(getTriggersMaybe(entity, configBag).orNull());
+    }
+
+}
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
 
b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
index 646ca82d22..e078eaffbe 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/sensor/http/HttpRequestSensor.java
@@ -29,6 +29,7 @@ import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.config.MapConfigKey;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor;
 import org.apache.brooklyn.feed.http.HttpFeed;
 import org.apache.brooklyn.feed.http.HttpPollConfig;
@@ -58,7 +59,7 @@ import net.minidev.json.JSONObject;
  * @see SshCommandSensor
  */
 @Beta
-public class HttpRequestSensor<T> extends AbstractAddSensorFeed<T> {
+public class HttpRequestSensor<T> extends AbstractAddTriggerableSensor<T> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HttpRequestSensor.class);
 
@@ -126,11 +127,9 @@ public class HttpRequestSensor<T> extends 
AbstractAddSensorFeed<T> {
         HttpPollConfig<T> pollConfig = new HttpPollConfig<T>(sensor)
                 .checkSuccess(HttpValueFunctions.responseCodeEquals(200))
                 .onFailureOrException(Functions.constant((T) null))
-                .onSuccess(successFunction)
-                .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
-                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
-                .logWarningGraceTime(logWarningGraceTime)
-                .period(initParam(SENSOR_PERIOD));
+                .onSuccess(successFunction);
+
+        standardPollConfig(entity, initParams(), pollConfig);
 
         HttpFeed.Builder httpRequestBuilder = HttpFeed.builder().entity(entity)
                 .baseUri(uri)
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java 
b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
index cdc9e1d083..cdc77112ad 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java
@@ -39,6 +39,7 @@ import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.location.Locations;
 import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
 import org.apache.brooklyn.feed.CommandPollConfig;
 import org.apache.brooklyn.feed.ssh.SshFeed;
@@ -77,7 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * @see HttpRequestSensor
  */
 @Beta
-public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> {
+public final class SshCommandSensor<T> extends AbstractAddTriggerableSensor<T> 
{
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SshCommandSensor.class);
 
@@ -150,24 +151,17 @@ public final class SshCommandSensor<T> extends 
AbstractAddSensorFeed<T> {
             LOG.debug("Adding SSH sensor {} to {}", name, entity);
         }
 
-        final Boolean suppressDuplicates = EntityInitializers.resolve(params, 
SUPPRESS_DUPLICATES);
-        final Duration logWarningGraceTimeOnStartup = 
EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME_ON_STARTUP);
-        final Duration logWarningGraceTime = 
EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME);
-
         Supplier<Map<String,String>> envSupplier = new EnvSupplier(entity, 
params);
-
         Supplier<String> commandSupplier = new CommandSupplier(entity, params);
 
         CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor)
-                .period(initParam(SENSOR_PERIOD))
                 .env(envSupplier)
                 .command(commandSupplier)
-                .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
                 .checkSuccess(SshValueFunctions.exitStatusEquals(0))
                 
.onFailureOrException(Functions.constant((T)params.get(VALUE_ON_ERROR)))
-                .onSuccess(Functionals.chain(SshValueFunctions.stdout(), new 
CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), 
initParam(LAST_YAML_DOCUMENT))))
-                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
-                .logWarningGraceTime(logWarningGraceTime);
+                .onSuccess(Functionals.chain(SshValueFunctions.stdout(), new 
CoerceOutputFunction<>(sensor.getTypeToken(), initParam(FORMAT), 
initParam(LAST_YAML_DOCUMENT))));
+
+        standardPollConfig(entity, initParams(), pollConfig);
 
         SshFeed feed = SshFeed.builder()
                 .entity(entity)
diff --git 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 47db417054..69b8ca5ade 100644
--- 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@ -276,6 +276,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl 
implements DynamicClus
     
     private void connectAllMembersUp() {
         clusterOneAndAllMembersUp = FunctionFeed.builder()
+                .uniqueTag("one-and-all-members-up")
                 .entity(this)
                 .period(Duration.FIVE_SECONDS)
                 .poll(new FunctionPollConfig<Boolean, 
Boolean>(CLUSTER_ONE_AND_ALL_MEMBERS_UP)
diff --git 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
index a9d192ef71..7b187df98d 100644
--- 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
@@ -127,6 +127,7 @@ public class DynamicMultiGroupImpl extends DynamicGroupImpl 
implements DynamicMu
         Long interval = getConfig(RESCAN_INTERVAL);
         if (interval != null && interval > 0L) {
             rescan = FunctionFeed.builder()
+                    .uniqueTag("dynamic-multi-group-scanner")
                     .entity(this)
                     .poll(new FunctionPollConfig<Object, Void>(RESCAN)
                             .period(interval, TimeUnit.SECONDS)
diff --git 
a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java 
b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
index 95fe7ac82a..79b2b4dd34 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/AbstractCommandFeed.java
@@ -37,6 +37,7 @@ import org.apache.brooklyn.core.feed.AttributePollHandler;
 import org.apache.brooklyn.core.feed.DelegatingPollHandler;
 import org.apache.brooklyn.core.feed.Poller;
 import org.apache.brooklyn.core.location.Locations;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.feed.ssh.SshPollValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -237,15 +238,12 @@ public abstract class AbstractCommandFeed extends 
AbstractFeed {
                 handlers.add(new AttributePollHandler<SshPollValue>(config, 
entity, this));
                 if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, 
config.getPeriod());
             }
-            
-            getPoller().scheduleAtFixedRate(
-                    new Callable<SshPollValue>() {
-                        @Override
-                        public SshPollValue call() throws Exception {
-                            return exec(pollInfo.command.get(), 
pollInfo.env.get());
-                        }}, 
-                    new DelegatingPollHandler<SshPollValue>(handlers),
-                    minPeriod);
+
+            AbstractAddTriggerableSensor.scheduleWithTriggers(this, 
getPoller(), new Callable<SshPollValue>() {
+                @Override
+                public SshPollValue call() throws Exception {
+                    return exec(pollInfo.command.get(), pollInfo.env.get());
+                }}, new DelegatingPollHandler(handlers), minPeriod, configs);
         }
     }
     
diff --git 
a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java 
b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
index 962c2c0638..f9f2a1867a 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionFeed.java
@@ -32,6 +32,8 @@ import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.feed.AbstractFeed;
 import org.apache.brooklyn.core.feed.AttributePollHandler;
 import org.apache.brooklyn.core.feed.DelegatingPollHandler;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
+import org.apache.brooklyn.util.http.HttpToolResponse;
 import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,6 +104,7 @@ public class FunctionFeed extends AbstractFeed {
         private long period = 500;
         private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
         private List<FunctionPollConfig<?,?>> polls = Lists.newArrayList();
+        private String name;
         private String uniqueTag;
         private volatile boolean built;
 
@@ -129,6 +132,10 @@ public class FunctionFeed extends AbstractFeed {
             polls.add(config);
             return this;
         }
+        public Builder name(String name) {
+            this.name = name;
+            return this;
+        }
         public Builder uniqueTag(String uniqueTag) {
             this.uniqueTag = uniqueTag;
             return this;
@@ -182,6 +189,10 @@ public class FunctionFeed extends AbstractFeed {
             Callable<?> job = config.getCallable();
             polls.put(new FunctionPollIdentifier(job), configCopy);
         }
+
+        if (builder.name!=null) setDisplayName(builder.name);
+        else if (builder.uniqueTag!=null) setDisplayName(builder.uniqueTag);
+
         config().set(POLLS, polls);
         initUniqueTag(builder.uniqueTag, polls.values());
     }
@@ -199,11 +210,8 @@ public class FunctionFeed extends AbstractFeed {
                 handlers.add(new AttributePollHandler(config, entity, this));
                 if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, 
config.getPeriod());
             }
-            
-            getPoller().scheduleAtFixedRate(
-                    (Callable)pollInfo.job,
-                    new DelegatingPollHandler(handlers), 
-                    minPeriod);
+
+            AbstractAddTriggerableSensor.scheduleWithTriggers(this, 
getPoller(), (Callable)pollInfo.job, new DelegatingPollHandler(handlers), 
minPeriod, configs);
         }
     }
 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java 
b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
index ffe690af44..95a11a2a24 100644
--- 
a/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
+++ 
b/core/src/main/java/org/apache/brooklyn/feed/function/FunctionPollConfig.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.util.javalang.JavaClassNames;
 
 public class FunctionPollConfig<S, T> extends PollConfig<S, T, 
FunctionPollConfig<S, T>> {
 
+    private String name;
     private Callable<?> callable;
     
     public static <T> FunctionPollConfig<?, T> forSensor(AttributeSensor<T> 
sensor) {
@@ -51,12 +52,18 @@ public class FunctionPollConfig<S, T> extends PollConfig<S, 
T, FunctionPollConfi
     public FunctionPollConfig(FunctionPollConfig<S, T> other) {
         super(other);
         callable = other.callable;
+        name = other.name;
     }
     
     public Callable<? extends Object> getCallable() {
         return callable;
     }
-    
+
+    public FunctionPollConfig<S, T> name(String name) {
+        this.name = name;
+        return this;
+    }
+
     /**
      * The {@link Callable} to be invoked on each poll.
      * <p>
@@ -108,6 +115,7 @@ public class FunctionPollConfig<S, T> extends PollConfig<S, 
T, FunctionPollConfi
 
     @Override protected String toStringBaseName() { return "fn"; }
     @Override protected String toStringPollSource() {
+        if (name!=null) return name;
         if (callable==null) return null;
         String cs = callable.toString();
         if (!cs.contains( ""+Integer.toHexString(callable.hashCode()) )) {
diff --git a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java 
b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
index 2f0247bc8b..4d1a13ffa1 100644
--- a/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
+++ b/core/src/main/java/org/apache/brooklyn/feed/http/HttpFeed.java
@@ -37,6 +37,7 @@ import org.apache.brooklyn.core.feed.Poller;
 import org.apache.brooklyn.core.location.Locations;
 import org.apache.brooklyn.core.location.Machines;
 import org.apache.brooklyn.core.location.internal.LocationInternal;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.util.core.javalang.BrooklynHttpConfig;
 import org.apache.brooklyn.util.executor.HttpExecutorFactory;
 import org.apache.brooklyn.util.guava.Maybe;
@@ -416,8 +417,10 @@ public class HttpFeed extends AbstractFeed {
                             
.config(BrooklynHttpConfig.httpConfigBuilder(getEntity()).build())
                             .build());
                     return createHttpToolRespose(response, startTime);
-                }};
-                getPoller().scheduleAtFixedRate(pollJob, new 
DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod);
+                }
+            };
+
+            AbstractAddTriggerableSensor.scheduleWithTriggers(this, 
getPoller(), pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), 
minPeriod, configs);
         }
     }
 
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
 
b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
index 623e8f4090..faaff23662 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java
@@ -291,6 +291,7 @@ public abstract class SoftwareProcessImpl extends 
AbstractEntity implements Soft
     protected void connectServiceUpIsRunning() {
         Duration period = config().get(SERVICE_PROCESS_IS_RUNNING_POLL_PERIOD);
         serviceProcessIsRunning = FunctionFeed.builder()
+                .uniqueTag("check-service-process-is-running")
                 .entity(this)
                 .period(period)
                 .poll(new FunctionPollConfig<Boolean, 
Boolean>(SERVICE_PROCESS_IS_RUNNING)
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
 
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
index 130c25b963..474761c724 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerCommons.java
@@ -22,7 +22,9 @@ import com.google.common.collect.Lists;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.config.MapConfigKey;
 import org.apache.brooklyn.core.config.SetConfigKey;
+import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.util.time.Duration;
 
 import java.util.List;
@@ -42,6 +44,8 @@ public interface ContainerCommons {
     ConfigKey<List> COMMAND = ConfigKeys.newConfigKey(List.class,"command", 
"Single command and optional arguments to execute for the container (overrides 
image EntryPoint and Cmd)", Lists.newArrayList());
     ConfigKey<List> ARGUMENTS = ConfigKeys.newConfigKey(List.class,"args", 
"Additional arguments to pass to the command at the container (in addition to 
the command supplied here or the default in the image)", Lists.newArrayList());
 
+    MapConfigKey<Object> SHELL_ENVIRONMENT = 
BrooklynConfigKeys.SHELL_ENVIRONMENT;
+
     ConfigKey<Duration> TIMEOUT = ConfigKeys.newConfigKey(Duration.class, 
"timeout", "Container execution timeout (default 5 minutes)", 
Duration.minutes(5));
 
     ConfigKey<Boolean> REQUIRE_EXIT_CODE_ZERO = 
ConfigKeys.newConfigKey(Boolean.class, "requireExitCodeZero", "Whether task 
should fail if container returns non-zero exit code (default true)", true);
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
 
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
index d35cac56d9..dbcf071622 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerSensor.java
@@ -18,29 +18,29 @@
  */
 package org.apache.brooklyn.tasks.kubectl;
 
+import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.core.sensor.ssh.SshCommandSensor;
 import org.apache.brooklyn.feed.function.FunctionFeed;
 import org.apache.brooklyn.feed.function.FunctionPollConfig;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.brooklyn.core.mgmt.BrooklynTaskTags.SENSOR_TAG;
 
 @SuppressWarnings({"UnstableApiUsage", "deprecation", "unchecked"})
-public class ContainerSensor<T> extends AbstractAddSensorFeed<T> implements 
ContainerCommons {
+public class ContainerSensor<T> extends AbstractAddTriggerableSensor<T> 
implements ContainerCommons {
 
     public static final ConfigKey<String> FORMAT = SshCommandSensor.FORMAT;
     public static final ConfigKey<Boolean> LAST_YAML_DOCUMENT = 
SshCommandSensor.LAST_YAML_DOCUMENT;
@@ -63,34 +63,42 @@ public class ContainerSensor<T> extends 
AbstractAddSensorFeed<T> implements Cont
 
         ConfigBag configBag = ConfigBag.newInstanceCopying(initParams());
 
-        final Boolean suppressDuplicates = 
EntityInitializers.resolve(configBag, SUPPRESS_DUPLICATES);
-        final Duration logWarningGraceTimeOnStartup = 
EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME_ON_STARTUP);
-        final Duration logWarningGraceTime = 
EntityInitializers.resolve(configBag, LOG_WARNING_GRACE_TIME);
+        FunctionPollConfig<Object, String> poll = new 
FunctionPollConfig<>(sensor)
+                .callable(new ContainerSensorCallable(entity, configBag, 
sensor));
+        standardPollConfig(entity, configBag, poll);
 
-        ((EntityInternal)entity).feeds().add(FunctionFeed.builder()
+        ((EntityInternal) entity).feeds().add(FunctionFeed.builder()
                 .entity(entity)
-                .period(initParam(SENSOR_PERIOD))
                 .onlyIfServiceUp()
-                .poll(new FunctionPollConfig<>(sensor)
-                        .callable(new Callable<Object>() {
-                            @Override
-                            public Object call() throws Exception {
-                                Task<ContainerTaskResult> containerTask = 
ContainerTaskFactory.newInstance()
-                                        .summary("Running " + 
EntityInitializers.resolve(configBag, SENSOR_NAME))
-                                        
.jobIdentifier(entity.getApplication()+"-"+entity.getId() + "-" + SENSOR_TAG)
-                                        .configure(configBag.getAllConfig())
-                                        .newTask();
-                                
DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
-                                String mainStdout = 
containerTask.getUnchecked(Duration.of(5, TimeUnit.MINUTES)).getMainStdout();
-                                return (new 
SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), 
initParam(FORMAT), initParam(LAST_YAML_DOCUMENT))).apply(mainStdout);
-                            }
-                        })
-                        
.suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
-                        
.logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
-                        .logWarningGraceTime(logWarningGraceTime))
+                .poll(poll)
                 .build());
     }
 
+    public static class ContainerSensorCallable implements Callable<Object> {
+        private final Entity entity;
+        private final ConfigBag configBag;
+        private final Sensor<?> sensor;
 
-}
+        public ContainerSensorCallable(Entity entity, ConfigBag configBag, 
Sensor<?> sensor) {
+            this.entity = entity;
+            this.configBag = configBag;
+            this.sensor = sensor;
+        }
+        public Object call() throws Exception {
+            Task<ContainerTaskResult> containerTask = 
ContainerTaskFactory.newInstance()
+                    .summary("Running " + 
EntityInitializers.resolve(configBag, SENSOR_NAME))
+                    .jobIdentifier(entity.getApplication() + "-" + 
entity.getId() + "-" + SENSOR_TAG)
+                    .configure(configBag.getAllConfig())
+                    .newTask();
+            DynamicTasks.queueIfPossible(containerTask).orSubmitAsync(entity);
+            String mainStdout = 
containerTask.getUnchecked(configBag.get(TIMEOUT)).getMainStdout();
+            return (new 
SshCommandSensor.CoerceOutputFunction<>(sensor.getTypeToken(), 
configBag.get(FORMAT), configBag.get(LAST_YAML_DOCUMENT))).apply(mainStdout);
+        }
 
+        @Override
+        public String toString() {
+            return "container-sensor[" + configBag.get(CONTAINER_IMAGE) + "]";
+        }
+    }
+
+}
diff --git 
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
 
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
index 84cb436b4b..310888cf08 100644
--- 
a/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
+++ 
b/software/base/src/main/java/org/apache/brooklyn/tasks/kubectl/ContainerTaskFactory.java
@@ -23,7 +23,6 @@ import com.google.gson.Gson;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.mgmt.TaskAdaptable;
-import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
@@ -124,7 +123,7 @@ public class ContainerTaskFactory<T extends 
ContainerTaskFactory<T,RET>,RET> imp
 
                     LOG.debug("Submitting container job in namespace 
"+namespace+", name "+kubeJobName);
 
-                    Map<String, String> env = new 
ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config,
 BrooklynConfigKeys.SHELL_ENVIRONMENT));
+                    Map<String, String> env = new 
ShellEnvironmentSerializer(((EntityInternal)entity).getManagementContext()).serialize(EntityInitializers.resolve(config,
 SHELL_ENVIRONMENT));
                     final 
BrooklynBomOsgiArchiveInstaller.FileWithTempInfo<File> jobYaml =  new 
KubeJobFileCreator()
                             .withImage(containerImage)
                             .withImagePullPolicy(containerImagePullPolicy)
@@ -578,7 +577,7 @@ public class ContainerTaskFactory<T extends 
ContainerTaskFactory<T,RET>,RET> imp
             return null;
         }
 
-        LOG.info("Deleting namespace " + namespace);
+        LOG.debug("Deleting namespace " + namespace);
         // do this not as a subtask so we can run even if the main queue fails
         ProcessTaskFactory<String> tf = 
newSimpleTaskFactory(String.format(NAMESPACE_DELETE_CMD, 
namespace)).summary("Tear down containers").allowingNonZeroExitCode();
         if (!requireSuccess) tf = tf.allowingNonZeroExitCode();
@@ -611,7 +610,7 @@ public class ContainerTaskFactory<T extends 
ContainerTaskFactory<T,RET>,RET> imp
         return environmentVariablesRaw(map);
     }
     public T environmentVariablesRaw(Map<String,?> map) {
-        config.put(BrooklynConfigKeys.SHELL_ENVIRONMENT, MutableMap.copyOf( 
map ) );
+        config.put(SHELL_ENVIRONMENT, MutableMap.copyOf( map ) );
         return self();
     }
 
@@ -620,7 +619,7 @@ public class ContainerTaskFactory<T extends 
ContainerTaskFactory<T,RET>,RET> imp
         return this.environmentVariableRaw(key, (Object)val);
     }
     public T environmentVariableRaw(String key, Object val) {
-        return environmentVariablesRaw(MutableMap.copyOf( 
config.get(BrooklynConfigKeys.SHELL_ENVIRONMENT) ).add(key, val));
+        return environmentVariablesRaw(MutableMap.copyOf( 
config.get(SHELL_ENVIRONMENT) ).add(key, val));
     }
 
     @Override
diff --git 
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
 
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
index 5de54807d3..679162d49e 100644
--- 
a/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
+++ 
b/software/base/src/test/java/org/apache/brooklyn/tasks/kubectl/ContainerSensorTest.java
@@ -21,13 +21,20 @@ package org.apache.brooklyn.tasks.kubectl;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Dumper;
 import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
 import org.apache.brooklyn.core.sensor.Sensors;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.testng.annotations.Test;
 
 @SuppressWarnings( "UnstableApiUsage")
@@ -123,4 +130,30 @@ public class ContainerSensorTest extends 
BrooklynAppUnitTestSupport {
         EntityAsserts.assertAttributeEqualsEventually(parentEntity, 
Attributes.SERVICE_UP, true);
         EntityAsserts.assertAttributeEventually(parentEntity, 
Sensors.newStringSensor("tf-version-sensor"), s -> s.contains("Terraform"));
     }
+
+    @Test
+    public void testTriggeredContainerSensor() {
+        AttributeSensor<Object> trigger = Sensors.newSensor(Object.class, 
"the-trigger");
+        AttributeSensor<Object> triggered = Sensors.newSensor(Object.class, 
"triggered");
+        ConfigBag parameters = ConfigBag.newInstance(MutableMap.of(
+                ContainerCommons.CONTAINER_IMAGE, "stedolan/jq",
+                ContainerCommons.CONTAINER_IMAGE_PULL_POLICY, 
PullPolicy.IF_NOT_PRESENT,
+                ContainerCommons.SHELL_ENVIRONMENT, 
MutableMap.of("LAST_TRIGGER", DependentConfiguration.attributeWhenReady(app, 
trigger)),
+                ContainerCommons.BASH_SCRIPT, ImmutableList.of("echo " + 
"$LAST_TRIGGER" + " | jq .value"),
+                ContainerSensor.SENSOR_TRIGGERS, 
MutableList.of(MutableMap.of("entity", app.getId(), "sensor", "the-trigger")),
+                ContainerSensor.SENSOR_NAME, "triggered"));
+
+        ContainerSensor<String> initializer = new 
ContainerSensor<>(parameters);
+        TestEntity child = 
app.createAndManageChild(EntitySpec.create(TestEntity.class).addInitializer(initializer));
+        app.start(ImmutableList.of());
+
+        EntityAsserts.assertAttributeEquals(child, triggered, null);
+        app.sensors().set(trigger, "{ \"name\": \"bob\", \"value\": 3 }");
+
+        Time.sleep(Duration.ONE_SECOND);
+        Dumper.dumpInfo(app);
+
+        EntityAsserts.assertAttributeEventuallyNonNull(child, triggered);
+        EntityAsserts.assertAttributeEquals(child, triggered, "3");
+    }
 }
diff --git 
a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
 
b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
index ca18b25048..08d9a2ec03 100644
--- 
a/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
+++ 
b/software/winrm/src/main/java/org/apache/brooklyn/core/sensor/windows/WinRmCommandSensor.java
@@ -30,6 +30,7 @@ import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.entity.EntityInitializers;
 import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.sensor.AbstractAddSensorFeed;
+import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.core.sensor.http.HttpRequestSensor;
 import org.apache.brooklyn.feed.CommandPollConfig;
 import org.apache.brooklyn.feed.ssh.SshValueFunctions;
@@ -63,7 +64,7 @@ import com.google.common.reflect.TypeToken;
  * @see HttpRequestSensor
  */
 @Beta
-public final class WinRmCommandSensor<T> extends AbstractAddSensorFeed<T> {
+public final class WinRmCommandSensor<T> extends 
AbstractAddTriggerableSensor<T> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(WinRmCommandSensor.class);
 
@@ -90,10 +91,6 @@ public final class WinRmCommandSensor<T> extends 
AbstractAddSensorFeed<T> {
             LOG.debug("Adding WinRM sensor {} to {}", name, entity);
         }
 
-        final Boolean suppressDuplicates = 
EntityInitializers.resolve(initParams(), SUPPRESS_DUPLICATES);
-        final Duration logWarningGraceTimeOnStartup = 
EntityInitializers.resolve(initParams(), LOG_WARNING_GRACE_TIME_ON_STARTUP);
-        final Duration logWarningGraceTime = 
EntityInitializers.resolve(initParams(), LOG_WARNING_GRACE_TIME);
-
         Supplier<Map<String,String>> envSupplier = new 
Supplier<Map<String,String>>() {
             @SuppressWarnings("serial")
             @Override
@@ -127,16 +124,15 @@ public final class WinRmCommandSensor<T> extends 
AbstractAddSensorFeed<T> {
                 .period(initParam(SENSOR_PERIOD))
                 .env(envSupplier)
                 .command(commandSupplier)
-                .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates))
                 .checkSuccess(SshValueFunctions.exitStatusEquals(0))
                 .onFailureOrException(Functions.constant((T) null))
                 .onSuccess(Functions.compose(new Function<String, T>() {
                         @Override
                         public T apply(String input) {
                             return 
TypeCoercions.coerce(Strings.trimEnd(input), (Class<T>) sensor.getType());
-                        }}, SshValueFunctions.stdout()))
-                .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup)
-                .logWarningGraceTime(logWarningGraceTime);
+                        }}, SshValueFunctions.stdout()));
+
+        standardPollConfig(entity, initParams(), pollConfig);
 
         CmdFeed feed = CmdFeed.builder()
                 .entity(entity)

Reply via email to