This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit b4c96f21fb71947857258251a479585e6bf2bad0
Author: Alex Heneveld <[email protected]>
AuthorDate: Fri Nov 11 11:01:54 2022 +0000

    tidy up of recent workflow-based adjuncts
    
    * collapse multiple triggers to single initial run
    * attach adjunct as tag for reporting on sensors
    * log information about runs, including condition not applicable
    * ensure feeds are treated like other adjuncts when persisted
---
 .../camp/brooklyn/WorkflowYamlRebindTest.java      |  67 ++++++++++++-
 .../brooklyn/camp/brooklyn/WorkflowYamlTest.java   |  30 +++---
 .../java/org/apache/brooklyn/core/feed/Poller.java | 104 +++++++++++++--------
 .../brooklyn/core/mgmt/BrooklynTaskTags.java       |  12 ++-
 .../mgmt/rebind/RebindExceptionHandlerImpl.java    |   2 +
 .../brooklyn/core/mgmt/rebind/RebindIteration.java |  27 +++---
 .../brooklyn/core/workflow/WorkflowPolicy.java     |   3 +-
 .../brooklyn/core/workflow/WorkflowSensor.java     |  38 ++++++--
 8 files changed, 205 insertions(+), 78 deletions(-)

diff --git 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java
 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java
index 71922a792c..a887c7fe5b 100644
--- 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java
+++ 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java
@@ -30,9 +30,8 @@ import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon;
 import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityAsserts;
-import org.apache.brooklyn.core.entity.StartableApplication;
+import org.apache.brooklyn.core.entity.*;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
 import org.apache.brooklyn.core.mgmt.rebind.RebindOptions;
 import org.apache.brooklyn.core.sensor.Sensors;
@@ -57,6 +56,7 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Predicate;
 
 public class WorkflowYamlRebindTest extends AbstractYamlRebindTest {
@@ -187,10 +187,11 @@ public class WorkflowYamlRebindTest extends 
AbstractYamlRebindTest {
                 "      sensor: myWorkflowSensor",
                 "      triggers:",
                 "        - trig",
+                "        - trig2",  // to make sure doesn't public too much
                 "      steps:",
                 "        - type: return",
                 "          value:",
-                "            n: $brooklyn:attributeWhenReady(\"trig\")",
+                "            n: ${entity.sensor.trig}",
                 "");
 
         waitForApplicationTasks(app);
@@ -199,11 +200,69 @@ public class WorkflowYamlRebindTest extends 
AbstractYamlRebindTest {
         child.sensors().set(Sensors.newIntegerSensor("trig"), 1);
         EntityAsserts.assertAttributeEqualsEventually(child, 
Sensors.newSensor(Object.class, "myWorkflowSensor"), MutableMap.of("n", 1));
 
+        Set<Task<?>> tt = 
BrooklynTaskTags.getTasksInAdjunctContext(mgmt().getExecutionManager(), 
Iterables.getOnlyElement(((EntityInternal) child).feeds().getFeeds()));
+        Asserts.assertThat(tt, ts -> ts.stream().anyMatch(ti -> 
ti.getDisplayName().contains("Workflow for sensor")));
+
+        Dumper.dumpInfo(app);
+        app = rebind();
+        child = Iterables.getOnlyElement(app.getChildren());
+
+        child.sensors().set(Sensors.newIntegerSensor("trig"), 2);
+        EntityAsserts.assertAttributeEqualsEventually(child, 
Sensors.newSensor(Object.class, "myWorkflowSensor"), MutableMap.of("n", 2));
+
+        tt = 
BrooklynTaskTags.getTasksInAdjunctContext(mgmt().getExecutionManager(), 
Iterables.getOnlyElement(((EntityInternal) child).feeds().getFeeds()));
+        Asserts.assertThat(tt, ts -> ts.stream().anyMatch(ti -> 
ti.getDisplayName().contains("Workflow for sensor")));
+    }
+
+    @Test(groups="WIP")
+    void testWorkflowSensorWithMutexRebind() throws Exception {
+        Entity app = createAndStartApplication(
+                "services:",
+                "- type: " + BasicEntity.class.getName(),
+                "  brooklyn.initializers:",
+                "  - type: workflow-sensor",
+                "    brooklyn.config:",
+                "      sensor: myWorkflowSensor",
+                "      triggers:",
+                "        - trig",
+                "        - trig2",  // to make sure doesn't public too much
+                "      steps:",
+                "        - step: workflow lock count",
+                "          steps:",
+                "           - let count = ${entity.sensor.count} ?? 0",
+                "           - let count = ${count} + 1",
+                "           - log count now ${count}",
+                "           - step: set-sensor count = ${count}",
+                "             replayable: yes",  // not needed for this test, 
but for good measure
+                "        - type: return",
+                "          value:",
+                "            n: ${entity.sensor.trig}",
+                "");
+
+        waitForApplicationTasks(app);
+        Entity child = Iterables.getOnlyElement(app.getChildren());
+
+        child.sensors().set(Sensors.newIntegerSensor("trig"), 1);
+        EntityAsserts.assertAttributeEqualsEventually(child, 
Sensors.newSensor(Object.class, "myWorkflowSensor"), MutableMap.of("n", 1));
+        // should run once initially and once on trigger
+        EntityAsserts.assertAttributeEqualsEventually(child, 
Sensors.newSensor(Object.class, "count"), 2);
+
+        Set<Task<?>> tt = 
BrooklynTaskTags.getTasksInAdjunctContext(mgmt().getExecutionManager(), 
Iterables.getOnlyElement(((EntityInternal) child).feeds().getFeeds()));
+        Asserts.assertThat(tt, ts -> ts.stream().anyMatch(ti -> 
ti.getDisplayName().contains("Workflow for sensor")));
+
+        Dumper.dumpInfo(app);
         app = rebind();
         child = Iterables.getOnlyElement(app.getChildren());
 
+        // is run again when feed restarts (but could weaken)
+        EntityAsserts.assertAttributeEqualsEventually(child, 
Sensors.newSensor(Object.class, "count"), 3);
+
         child.sensors().set(Sensors.newIntegerSensor("trig"), 2);
         EntityAsserts.assertAttributeEqualsEventually(child, 
Sensors.newSensor(Object.class, "myWorkflowSensor"), MutableMap.of("n", 2));
+        EntityAsserts.assertAttributeEquals(child, 
Sensors.newSensor(Object.class, "count"), 4);
+
+        tt = 
BrooklynTaskTags.getTasksInAdjunctContext(mgmt().getExecutionManager(), 
Iterables.getOnlyElement(((EntityInternal) child).feeds().getFeeds()));
+        Asserts.assertThat(tt, ts -> ts.stream().anyMatch(ti -> 
ti.getDisplayName().contains("Workflow for sensor")));
     }
 
 }
diff --git 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
index 597c22bbf0..a1dc3fc333 100644
--- 
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
+++ 
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java
@@ -68,6 +68,7 @@ import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool;
 import org.apache.brooklyn.util.text.Strings;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -154,6 +155,13 @@ public class WorkflowYamlTest extends AbstractYamlTest {
         doTestWorkflowSensor("triggers: theTrigger", 
Duration.seconds(1)::isLongerThan);
     }
 
+    @Test(groups="Integration") // because delay
+    public void testWorkflowSensorTriggerDoesntRunTooMuch() throws Exception {
+        Entity entity = doTestWorkflowSensor("triggers: [ theTrigger, 
anotherTrigger ]", Duration.seconds(1)::isLongerThan);
+        Time.sleep(Duration.millis(500));
+        EntityAsserts.assertAttributeEqualsEventually(entity, 
MY_WORKFLOW_SENSOR, MutableMap.of("foo", "bar", "v", 1));
+    }
+
     @Test(groups="Integration") // because delay
     public void testWorkflowSensorPeriod() throws Exception {
         doTestWorkflowSensor("period: 2s", Duration.seconds(2)::isShorterThan);
@@ -189,7 +197,9 @@ public class WorkflowYamlTest extends AbstractYamlTest {
         doTestWorkflowPolicy("condition: { sensor: not_exist }\n" + "period: 
200 ms", null);
     }
 
-    void doTestWorkflowSensor(String triggers, Predicate<Duration> 
timeCheckOrNullIfShouldFail) throws Exception {
+    static final AttributeSensor<Object> MY_WORKFLOW_SENSOR = 
Sensors.newSensor(Object.class, "myWorkflowSensor");
+
+    Entity doTestWorkflowSensor(String triggers, Predicate<Duration> 
timeCheckOrNullIfShouldFail) throws Exception {
         Entity app = createAndStartApplication(
                 "services:",
                 "- type: " + BasicEntity.class.getName(),
@@ -216,17 +226,16 @@ public class WorkflowYamlTest extends AbstractYamlTest {
         Duration d1 = Duration.of(sw);
 
         Entity entity = Iterables.getOnlyElement(app.getChildren());
-        AttributeSensor<Object> s = Sensors.newSensor(Object.class, 
"myWorkflowSensor");
 
         if (timeCheckOrNullIfShouldFail!=null) {
-            EntityAsserts.assertAttributeEventuallyNonNull(entity, s);
+            EntityAsserts.assertAttributeEventuallyNonNull(entity, 
MY_WORKFLOW_SENSOR);
             Duration d2 = Duration.of(sw).subtract(d1);
             // initial set should be soon after startup
             Asserts.assertThat(d2, Duration.millis(500)::isLongerThan);
-            EntityAsserts.assertAttributeEqualsEventually(entity, s, 
MutableMap.of("foo", "bar", "v", 0));
+            EntityAsserts.assertAttributeEqualsEventually(entity, 
MY_WORKFLOW_SENSOR, MutableMap.of("foo", "bar", "v", 0));
 
             entity.sensors().set(Sensors.newStringSensor("theTrigger"), "go");
-            EntityAsserts.assertAttributeEqualsEventually(entity, s, 
MutableMap.of("foo", "bar", "v", 1));
+            EntityAsserts.assertAttributeEqualsEventually(entity, 
MY_WORKFLOW_SENSOR, MutableMap.of("foo", "bar", "v", 1));
             Duration d3 = Duration.of(sw).subtract(d2);
             // the next iteration should obey the time constraint specified 
above
             if (!timeCheckOrNullIfShouldFail.test(d3)) Asserts.fail("Timing 
error, took " + d3);
@@ -236,9 +245,10 @@ public class WorkflowYamlTest extends AbstractYamlTest {
             // step definitions should not be resolved by jackson
             defs.forEach(def -> Asserts.assertThat(def, d -> !(d instanceof 
WorkflowStepDefinition)));
         } else {
-            EntityAsserts.assertAttributeEqualsContinually(entity, s, null);
+            EntityAsserts.assertAttributeEqualsContinually(entity, 
MY_WORKFLOW_SENSOR, null);
             Asserts.assertThat(new 
WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(entity).values(), 
Collection::isEmpty);
         }
+        return entity;
     }
 
     public void doTestWorkflowPolicy(String triggers, Predicate<Duration> 
timeCheckOrNullIfShouldFail) throws Exception {
@@ -274,24 +284,22 @@ public class WorkflowYamlTest extends AbstractYamlTest {
         // should really ID be settable from flag?
         Asserts.assertEquals(policy.getId(), "set-my-workflow-sensor");
 
-        AttributeSensor<Object> s = Sensors.newSensor(Object.class, 
"myWorkflowSensor");
-
         if (timeCheckOrNullIfShouldFail!=null) {
 //            EntityAsserts.assertAttributeEventuallyNonNull(entity, s);
-            EntityAsserts.assertAttributeEquals(entity, s, null);
+            EntityAsserts.assertAttributeEquals(entity, MY_WORKFLOW_SENSOR, 
null);
             Duration d2 = Duration.of(sw).subtract(d1);
             // initial set should be soon after startup
             Asserts.assertThat(d2, Duration.millis(500)::isLongerThan);
 //            EntityAsserts.assertAttributeEqualsEventually(entity, s, 
MutableMap.of("foo", "bar", "v", 0));
 
             entity.sensors().set(Sensors.newStringSensor("theTrigger"), "go");
-            EntityAsserts.assertAttributeEqualsEventually(entity, s, 
MutableMap.of("foo", "bar", "v", 0));
+            EntityAsserts.assertAttributeEqualsEventually(entity, 
MY_WORKFLOW_SENSOR, MutableMap.of("foo", "bar", "v", 0));
 //            EntityAsserts.assertAttributeEqualsEventually(entity, s, 
MutableMap.of("foo", "bar", "v", 1));
             Duration d3 = Duration.of(sw).subtract(d2);
             // the next iteration should obey the time constraint specified 
above
             if (!timeCheckOrNullIfShouldFail.test(d3)) Asserts.fail("Timing 
error, took " + d3);
         } else {
-            EntityAsserts.assertAttributeEqualsContinually(entity, s, null);
+            EntityAsserts.assertAttributeEqualsContinually(entity, 
MY_WORKFLOW_SENSOR, null);
         }
     }
 
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java 
b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 5ea02652b9..440228be9b 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -21,13 +21,12 @@ package org.apache.brooklyn.core.feed;
 import java.util.LinkedHashSet;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.Task;
@@ -36,13 +35,13 @@ import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.objs.AbstractEntityAdjunct;
-import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.predicates.DslPredicates;
 import org.apache.brooklyn.util.core.task.DynamicSequentialTask;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
 import org.apache.brooklyn.util.core.task.ScheduledTask;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -71,7 +70,7 @@ public class Poller<V> {
     private final Set<Callable<?>> oneOffJobs = new 
LinkedHashSet<Callable<?>>();
     private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>();
     private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>();
-    private final Set<ScheduledTask> tasks = new 
LinkedHashSet<ScheduledTask>();
+    private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>();
     private volatile boolean started = false;
 
     public <PI,PC extends PollConfig> void scheduleFeed(AbstractFeed feed, 
SetMultimap<PI,PC> polls, Function<PI,Callable<?>> jobFactory) {
@@ -132,6 +131,7 @@ public class Poller<V> {
     private static class PollJob<V> {
         final PollHandler<? super V> handler;
         final Duration pollPeriod;
+        final Callable<?> job;
         final Runnable wrappedJob;
         final Entity pollTriggerEntity;
         final Sensor<?> pollTriggerSensor;
@@ -149,6 +149,7 @@ public class Poller<V> {
             this.pollTriggerEntity = sensorSource;
             this.pollTriggerSensor = sensor;
             this.pollCondition = pollCondition;
+            this.job = job;
             wrappedJob = new Runnable() {
                 @Override
                 public void run() {
@@ -157,7 +158,8 @@ public class Poller<V> {
                             DslPredicates.DslPredicate pc = 
pollCondition.get();
                             if (pc!=null) {
                                 if 
(!pc.apply(BrooklynTaskTags.getContextEntity(Tasks.current()))) {
-                                    if (log.isTraceEnabled()) 
log.trace("PollJob for {} skipped because condition does not apply", job);
+                                    if (log.isTraceEnabled()) 
log.trace("Skipping execution for PollJob {} because condition does not apply", 
job);
+                                    log.debug("Skipping poll/feed execution 
because condition does not apply");  // log so we can see in log viewer
                                     return;
                                 }
                             }
@@ -231,33 +233,42 @@ public class Poller<V> {
         
         Duration minPeriod = null;
         Set<String> sensorSummaries = MutableSet.of();
-        for (final PollJob<V> pollJob : pollJobs) {
-            final String scheduleName = MutableList.of(adjunct !=null ? 
adjunct.getDisplayName() : null, pollJob.handler.getDescription())
-                    
.stream().filter(Strings::isNonBlank).collect(Collectors.joining("; "));
-            boolean added = false;
 
-            Callable<Task<?>> tf = () -> {
-                DynamicSequentialTask<Void> task = new 
DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, 
"entity", entity),
-                        () -> {
-                            if (!Entities.isManagedActive(entity)) {
-                                return null;
-                            }
-                            if (onlyIfServiceUp && 
!Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
-                                return null;
-                            }
-                            pollJob.wrappedJob.run();
+        final Function<PollJob,String> scheduleNameFn = pollJob -> 
MutableList.of(adjunct !=null ? adjunct.getDisplayName() : null, 
pollJob.handler.getDescription())
+                
.stream().filter(Strings::isNonBlank).collect(Collectors.joining("; "));
+
+        BiFunction<Runnable,String,Task<?>> tf = (job, scheduleName) -> {
+            DynamicSequentialTask<Void> task = new 
DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, 
"entity", entity),
+                    () -> {
+                        if (!Entities.isManagedActive(entity)) {
                             return null;
-                        });
-                // explicitly make non-transient -- we want to see its 
execution, even if parent is transient
-                BrooklynTaskTags.addTagDynamically(task, 
BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
-                return task;
-            };
+                        }
+                        if (onlyIfServiceUp && 
!Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
+                            return null;
+                        }
+                        job.run();
+                        return null;
+                    });
+            // explicitly make non-transient -- we want to see its execution, 
even if parent is transient
+            BrooklynTaskTags.addTagDynamically(task, 
BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
+            return task;
+        };
+        Multimap<Callable,PollJob> nonScheduledJobs = 
Multimaps.newSetMultimap(MutableMap.of(), MutableSet::of);
+        pollJobs.forEach(pollJob -> nonScheduledJobs.put(pollJob.job, 
pollJob));
 
-            ScheduledTask.Builder tb = ScheduledTask.builder(tf)
-                    .cancelOnException(false)
-                    .tag(adjunct != null ? 
BrooklynTaskTags.tagForContextAdjunct(adjunct) : null);
+        // 'runInitially' could be an option on the job; currently we always do
+        // if it's a scheduled task, that happens automatically; if it's a 
triggered task
+        // we collect the distinct runnables and run each of those
+        // (the poll job model doesn't work perfectly since usually all 
schedules/triggers are for the same job)
+
+        for (final PollJob<V> pollJob : pollJobs) {
+            String scheduleName = scheduleNameFn.apply(pollJob);
+            boolean added = false;
 
             if (pollJob.pollPeriod!=null && 
pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
+                ScheduledTask.Builder tb = ScheduledTask.builder(() -> 
tf.apply(pollJob.wrappedJob, scheduleName))
+                        .cancelOnException(false)
+                        .tag(adjunct != null ? 
BrooklynTaskTags.tagForContextAdjunct(adjunct) : null);
                 added = true;
                 tb.displayName("Periodic: " + scheduleName);
                 tb.period(pollJob.pollPeriod);
@@ -265,11 +276,12 @@ public class Poller<V> {
                 if (minPeriod==null || 
(pollJob.pollPeriod.isShorterThan(minPeriod))) {
                     minPeriod = pollJob.pollPeriod;
                 }
-            } else {
-                // if no period, we simply need to run it initially
-                tb.displayName("Initial: "+scheduleName);
+                ScheduledTask st = tb.build();
+                scheduledTasks.add(st);
+                log.debug("Submitting scheduled task "+st+" for poll/feed 
"+this+", job "+pollJob);
+                Entities.submit(entity, st);
+                nonScheduledJobs.removeAll(pollJob.job);
             }
-            tasks.add(Entities.submit(entity, tb.build()));
 
             if (pollJob.pollTriggerSensor !=null) {
                 added = true;
@@ -279,11 +291,12 @@ public class Poller<V> {
                 }
                 String summary = pollJob.pollTriggerSensor.getName();
                 if (pollJob.pollTriggerEntity!=null && 
!pollJob.pollTriggerEntity.equals(entity)) summary += " on 
"+pollJob.pollTriggerEntity;
+                log.debug("Adding subscription to "+summary+" for poll/feed 
"+this+", job "+pollJob);
                 sensorSummaries.add(summary);
                 pollJob.subscription = 
adjunct.subscriptions().subscribe(pollJob.pollTriggerEntity !=null ? 
pollJob.pollTriggerEntity : adjunct.getEntity(), pollJob.pollTriggerSensor, 
event -> {
                     // submit this on every event
                     try {
-                        adjunct.getExecutionContext().submit(tf.call());
+                        
adjunct.getExecutionContext().submit(tf.apply(pollJob.wrappedJob, 
scheduleName));
                     } catch (Exception e) {
                         throw Exceptions.propagate(e);
                     }
@@ -291,9 +304,24 @@ public class Poller<V> {
             }
 
             if (!added) {
-                if (log.isDebugEnabled()) log.debug("Activating poll (as 
one-off, as no period and no subscriptions) for {} (using {})", new Object[] 
{entity, this});
+                if (log.isDebugEnabled()) log.debug("Empty poll job 
"+pollJob+" in "+this+" for "+entity+"; if all jobs are empty (or trigger 
only), will add a trivial one-time initial task");
             }
         }
+
+        // no period for these, but we do need to run them initially, but 
combine if the Callable is the same (e.g. multiple triggers)
+        // not the PollJob is one per trigger, and the wrappedJob is specific 
to the poll job, but doesn't depend on the trigger, so we can just take the 
first
+        nonScheduledJobs.asMap().forEach( (jobC,jobP) -> {
+            Runnable job = jobP.iterator().next().wrappedJob;
+            String jobSummaries = jobP.stream().map(j -> 
j.handler.getDescription()).filter(Strings::isNonBlank).collect(Collectors.joining(",
 "));
+            String name =  (adjunct !=null ? adjunct.getDisplayName() : 
"anonymous")+(Strings.isNonBlank(jobSummaries) ? "; "+jobSummaries : "");
+            Task<Object> t = 
Tasks.builder().dynamic(true).displayName("Initial: " +name)
+                    .body(
+                            () -> DynamicTasks.queue(tf.apply(job, 
name)).getUnchecked())
+                    .tag(adjunct != null ? 
BrooklynTaskTags.tagForContextAdjunct(adjunct) : null)
+                    .build();
+            log.debug("Submitting initial task "+t+" for poll/feed "+this+", 
job "+job+" (because otherwise is trigger-only)");
+            Entities.submit(entity, t);
+        });
         
         if (adjunct !=null) {
             if (sensorSummaries.isEmpty()) {
@@ -326,7 +354,7 @@ public class Poller<V> {
         for (Task<?> task : oneOffTasks) {
             if (task != null) task.cancel(true);
         }
-        for (ScheduledTask task : tasks) {
+        for (ScheduledTask task : scheduledTasks) {
             if (task != null) task.cancel();
         }
         for (PollJob<?> j: pollJobs) {
@@ -336,12 +364,12 @@ public class Poller<V> {
             }
         }
         oneOffTasks.clear();
-        tasks.clear();
+        scheduledTasks.clear();
     }
 
     public boolean isRunning() {
         boolean hasActiveTasks = false;
-        for (Task<?> task: tasks) {
+        for (Task<?> task: scheduledTasks) {
             if (task.isBegun() && !task.isDone()) {
                 hasActiveTasks = true;
                 break;
@@ -349,7 +377,7 @@ public class Poller<V> {
         }
         boolean hasSubscriptions = pollJobs.stream().anyMatch(j -> 
j.subscription!=null);
         if (!started && hasActiveTasks) {
-            log.warn("Poller should not be running, but has active tasks, 
tasks: "+tasks);
+            log.warn("Poller should not be running, but has active tasks, 
tasks: "+ scheduledTasks);
         }
         if (!started && hasSubscriptions) {
             log.warn("Poller should not be running, but has subscriptions on 
jobs: "+pollJobs);
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index 423335dd18..f18ba372d1 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -194,7 +194,7 @@ public class BrooklynTaskTags extends TaskTags {
      * Tasks with this tag will also have a {@link 
#tagForContextEntity(Entity)}.
      */
     public static WrappedObject<EntityAdjunct> 
tagForContextAdjunct(EntityAdjunct adjunct) {
-        return new WrappedObject<EntityAdjunct>(CONTEXT_ADJUNCT, adjunct);
+        return new WrappedObject<>(CONTEXT_ADJUNCT, adjunct);
     }
     
 
@@ -235,6 +235,16 @@ public class BrooklynTaskTags extends TaskTags {
     public static Entity getContextEntity(Task<?> task) {
         return getWrappedEntityOfType(task, CONTEXT_ENTITY);
     }
+    public static EntityAdjunct getContextEntityAdjunct(Task<?> task, boolean 
recursively) {
+        WrappedObject<EntityAdjunct> result = 
getWrappedObjectTagOfType(getTagsFast(task), CONTEXT_ADJUNCT, 
EntityAdjunct.class);
+        if (result==null) {
+            if (recursively && task!=null) {
+                return getContextEntityAdjunct(task.getSubmittedByTask(), 
recursively);
+            }
+            return null;
+        }
+        return result.object;
+    }
 
     public static Object getTargetOrContextEntityTag(Task<?> task) {
         if (task == null) return null;
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindExceptionHandlerImpl.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindExceptionHandlerImpl.java
index c29de36204..9a604651fc 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindExceptionHandlerImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindExceptionHandlerImpl.java
@@ -273,6 +273,8 @@ public class RebindExceptionHandlerImpl implements 
RebindExceptionHandler {
             throw new RuntimeInterruptedException("Interruption discovered 
when recording dangling feed "+id);
         }
 
+        if (createAdjunctProxy!=null) return (Feed) 
createAdjunctProxy.apply(Feed.class, id);
+
         missingFeeds.add(id);
         if (danglingRefFailureMode == 
RebindManager.RebindFailureMode.FAIL_FAST) {
             throw new IllegalStateException("No feed found with id "+id);
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index fe3ead758d..03d8018707 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -558,18 +558,6 @@ public abstract class RebindIteration {
             logRebindingDebug("Not rebinding enrichers; feature disabled: {}", 
memento.getEnricherIds());
         }
 
-        if (!adjunctProxies.isEmpty()) {
-            LOG.warn("Adjunct proxies not empty, likely indicating dangling 
references: "+adjunctProxies);
-            adjunctProxies.entrySet().forEach(entry -> {
-                if (entry.getValue() instanceof Policy) 
exceptionHandler.onDanglingPolicyRef(entry.getKey());
-                else if (entry.getValue() instanceof Enricher) 
exceptionHandler.onDanglingEnricherRef(entry.getKey());
-                else {
-                    LOG.warn("Adjunct proxy for "+entry.getKey()+" is of 
unexpected type; "+entry.getValue()+"; reporting as dangling of unknown type");
-                    exceptionHandler.onDanglingUntypedItemRef(entry.getKey());
-                }
-            });
-            adjunctProxies.clear();
-        }
 
         // Instantiate feeds
         if (rebindManager.persistFeedsEnabled) {
@@ -579,6 +567,7 @@ public abstract class RebindIteration {
 
                 try {
                     Feed feed = instantiator.newFeed(feedMemento);
+                    EntityAdjunctProxyImpl.resetDelegate( 
adjunctProxies.remove(feed.getId()) , feed);
                     rebindContext.registerFeed(feedMemento.getId(), feed);
                     // started during associateAdjunctsWithEntities by 
RebindAdjuncts
                 } catch (Exception e) {
@@ -588,6 +577,20 @@ public abstract class RebindIteration {
         } else {
             logRebindingDebug("Not rebinding feeds; feature disabled: {}", 
memento.getFeedIds());
         }
+
+        if (!adjunctProxies.isEmpty()) {
+            LOG.warn("Adjunct proxies not empty, likely indicating dangling 
references: "+adjunctProxies);
+            adjunctProxies.entrySet().forEach(entry -> {
+                if (entry.getValue() instanceof Policy) 
exceptionHandler.onDanglingPolicyRef(entry.getKey());
+                else if (entry.getValue() instanceof Enricher) 
exceptionHandler.onDanglingEnricherRef(entry.getKey());
+                else {
+                    LOG.warn("Adjunct proxy for "+entry.getKey()+" is of 
unexpected type; "+entry.getValue()+"; reporting as dangling of unknown type");
+                    exceptionHandler.onDanglingUntypedItemRef(entry.getKey());
+                }
+            });
+            adjunctProxies.clear();
+        }
+
     }
 
     protected void reconstructEverything() {
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
index 1656d3c07a..af39ae6638 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java
@@ -62,7 +62,6 @@ public final class WorkflowPolicy<T> extends AbstractPolicy {
 
     // ? - do we need to have an option not to run when added?
 
-
     public WorkflowPolicy() {}
     public WorkflowPolicy(Map<?,?> params) {
         super(params);
@@ -123,7 +122,7 @@ public final class WorkflowPolicy<T> extends AbstractPolicy 
{
 
         Set<PollConfig> pollConfigs = MutableSet.of(pc);
         poller.schedulePoll(this, pollConfigs, new 
WorkflowSensor.WorkflowPollCallable(
-                getDisplayName() + " (workflow)", this, config().getBag()), 
new PolicyNoOpPollHandler());
+                getDisplayName() + " (workflow)", config().getBag(), this), 
new PolicyNoOpPollHandler());
 
         if (!isSuspended()) resume();
     }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java
index 70d9513530..fd12e4cdab 100644
--- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java
+++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java
@@ -19,6 +19,7 @@
 package org.apache.brooklyn.core.workflow;
 
 import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityInitializer;
@@ -41,6 +42,7 @@ import org.apache.brooklyn.feed.function.FunctionPollConfig;
 import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
 import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Maybe;
 import org.slf4j.Logger;
@@ -83,15 +85,15 @@ public final class WorkflowSensor<T> extends 
AbstractAddTriggerableSensor<T> imp
     }
 
     private void apply(final EntityLocal entity, final ConfigBag params) {
-
         AttributeSensor<T> sensor = addSensor(entity);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Adding workflow sensor {} to {}", sensor.getName(), 
entity);
         }
 
+        WorkflowPollCallable wc = new WorkflowPollCallable("Workflow for 
sensor " + sensor.getName(), params, null);
         FunctionPollConfig<Object,Object> pollConfig = new 
FunctionPollConfig<Object,T>(sensor)
-                .callable(new WorkflowPollCallable("Workflow for sensor " + 
sensor.getName(), entity, params))
+                .callable(wc)
                 
.onSuccess(TypeCoercions.<T>function((Class)sensor.getTypeToken().getRawType()));
 
         standardPollConfig(entity, initParams(), pollConfig);
@@ -103,8 +105,8 @@ public final class WorkflowSensor<T> extends 
AbstractAddTriggerableSensor<T> imp
                 .poll(pollConfig);
 
         FunctionFeed feed = feedBuilder.build();
+        wc.init(feed);
         entity.addFeed(feed);
-
     }
 
     @Override
@@ -145,18 +147,28 @@ public final class WorkflowSensor<T> extends 
AbstractAddTriggerableSensor<T> imp
 
     static class WorkflowPollCallable implements Callable<Object> {
         private final String workflowCallableName;
-        private final BrooklynObject entityOrAdjunct;
-        private final ConfigBag params;
+        private BrooklynObject entityOrAdjunct;
+        private final Map<String,Object> params;
 
-        public WorkflowPollCallable(String workflowCallableName, 
BrooklynObject entityOrAdjunct, ConfigBag params) {
+        protected WorkflowPollCallable(String workflowCallableName, ConfigBag 
params, BrooklynObject entityOrAdjunct) {
             this.workflowCallableName = workflowCallableName;
+            this.params = params.getAllConfigRaw();
+            this.entityOrAdjunct = entityOrAdjunct;
+        }
+
+        /** used in some places where the entity/adjunct is set late */
+        public void init(BrooklynObject entityOrAdjunct) {
             this.entityOrAdjunct = entityOrAdjunct;
-            this.params = params;
         }
 
         @Override
         public Object call() throws Exception {
-            WorkflowExecutionContext wc = 
WorkflowExecutionContext.newInstancePersisted(entityOrAdjunct, 
workflowCallableName, params, null, null, null);
+            BrooklynObject entityOrAdjunct = this.entityOrAdjunct;
+            if (entityOrAdjunct==null) entityOrAdjunct = 
BrooklynTaskTags.getContextEntityAdjunct(Tasks.current(), false);
+            if (entityOrAdjunct==null) entityOrAdjunct = 
BrooklynTaskTags.getContextEntity(Tasks.current());
+            if (entityOrAdjunct==null) throw new IllegalStateException("No 
entity adjunct or entity available for "+this);
+
+            WorkflowExecutionContext wc = 
WorkflowExecutionContext.newInstancePersisted(entityOrAdjunct, 
workflowCallableName, ConfigBag.newInstance(params), null, null, null);
             Task<Object> wt = wc.getTask(false /* condition checked by poll 
config framework */).get();
             if (entityOrAdjunct instanceof EntityAdjunct) {
                 // add tag to each task so it shows up in list on mgmt context
@@ -184,7 +196,13 @@ public final class WorkflowSensor<T> extends 
AbstractAddTriggerableSensor<T> imp
             }
         }
 
-        protected void onStart() {}
-        protected void onEnd() {}
+        @Override
+        public String toString() {
+            return "WorkflowPollCallable{" +
+                    "workflowCallableName='" + workflowCallableName + '\'' +
+                    ", context=" + entityOrAdjunct +
+                    ", params=" + params +
+                    '}';
+        }
     }
 }


Reply via email to