This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit b4c96f21fb71947857258251a479585e6bf2bad0 Author: Alex Heneveld <[email protected]> AuthorDate: Fri Nov 11 11:01:54 2022 +0000 tidy up of recent workflow-based adjuncts * collapse multiple triggers to single initial run * attach adjunct as tag for reporting on sensors * log information about runs, including condition not applicable * ensure feeds are treated like other adjuncts when persisted --- .../camp/brooklyn/WorkflowYamlRebindTest.java | 67 ++++++++++++- .../brooklyn/camp/brooklyn/WorkflowYamlTest.java | 30 +++--- .../java/org/apache/brooklyn/core/feed/Poller.java | 104 +++++++++++++-------- .../brooklyn/core/mgmt/BrooklynTaskTags.java | 12 ++- .../mgmt/rebind/RebindExceptionHandlerImpl.java | 2 + .../brooklyn/core/mgmt/rebind/RebindIteration.java | 27 +++--- .../brooklyn/core/workflow/WorkflowPolicy.java | 3 +- .../brooklyn/core/workflow/WorkflowSensor.java | 38 ++++++-- 8 files changed, 205 insertions(+), 78 deletions(-) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java index 71922a792c..a887c7fe5b 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlRebindTest.java @@ -30,9 +30,8 @@ import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon; import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityAsserts; -import org.apache.brooklyn.core.entity.StartableApplication; +import org.apache.brooklyn.core.entity.*; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext; import org.apache.brooklyn.core.mgmt.rebind.RebindOptions; import org.apache.brooklyn.core.sensor.Sensors; @@ -57,6 +56,7 @@ import org.testng.annotations.Test; import java.io.File; import java.util.Map; +import java.util.Set; import java.util.function.Predicate; public class WorkflowYamlRebindTest extends AbstractYamlRebindTest { @@ -187,10 +187,11 @@ public class WorkflowYamlRebindTest extends AbstractYamlRebindTest { " sensor: myWorkflowSensor", " triggers:", " - trig", + " - trig2", // to make sure doesn't public too much " steps:", " - type: return", " value:", - " n: $brooklyn:attributeWhenReady(\"trig\")", + " n: ${entity.sensor.trig}", ""); waitForApplicationTasks(app); @@ -199,11 +200,69 @@ public class WorkflowYamlRebindTest extends AbstractYamlRebindTest { child.sensors().set(Sensors.newIntegerSensor("trig"), 1); EntityAsserts.assertAttributeEqualsEventually(child, Sensors.newSensor(Object.class, "myWorkflowSensor"), MutableMap.of("n", 1)); + Set<Task<?>> tt = BrooklynTaskTags.getTasksInAdjunctContext(mgmt().getExecutionManager(), Iterables.getOnlyElement(((EntityInternal) child).feeds().getFeeds())); + Asserts.assertThat(tt, ts -> ts.stream().anyMatch(ti -> ti.getDisplayName().contains("Workflow for sensor"))); + + Dumper.dumpInfo(app); + app = rebind(); + child = Iterables.getOnlyElement(app.getChildren()); + + child.sensors().set(Sensors.newIntegerSensor("trig"), 2); + EntityAsserts.assertAttributeEqualsEventually(child, Sensors.newSensor(Object.class, "myWorkflowSensor"), MutableMap.of("n", 2)); + + tt = BrooklynTaskTags.getTasksInAdjunctContext(mgmt().getExecutionManager(), Iterables.getOnlyElement(((EntityInternal) child).feeds().getFeeds())); + Asserts.assertThat(tt, ts -> ts.stream().anyMatch(ti -> ti.getDisplayName().contains("Workflow for sensor"))); + } + + @Test(groups="WIP") + void testWorkflowSensorWithMutexRebind() throws Exception { + Entity app = createAndStartApplication( + "services:", + "- type: " + BasicEntity.class.getName(), + " brooklyn.initializers:", + " - type: workflow-sensor", + " brooklyn.config:", + " sensor: myWorkflowSensor", + " triggers:", + " - trig", + " - trig2", // to make sure doesn't public too much + " steps:", + " - step: workflow lock count", + " steps:", + " - let count = ${entity.sensor.count} ?? 0", + " - let count = ${count} + 1", + " - log count now ${count}", + " - step: set-sensor count = ${count}", + " replayable: yes", // not needed for this test, but for good measure + " - type: return", + " value:", + " n: ${entity.sensor.trig}", + ""); + + waitForApplicationTasks(app); + Entity child = Iterables.getOnlyElement(app.getChildren()); + + child.sensors().set(Sensors.newIntegerSensor("trig"), 1); + EntityAsserts.assertAttributeEqualsEventually(child, Sensors.newSensor(Object.class, "myWorkflowSensor"), MutableMap.of("n", 1)); + // should run once initially and once on trigger + EntityAsserts.assertAttributeEqualsEventually(child, Sensors.newSensor(Object.class, "count"), 2); + + Set<Task<?>> tt = BrooklynTaskTags.getTasksInAdjunctContext(mgmt().getExecutionManager(), Iterables.getOnlyElement(((EntityInternal) child).feeds().getFeeds())); + Asserts.assertThat(tt, ts -> ts.stream().anyMatch(ti -> ti.getDisplayName().contains("Workflow for sensor"))); + + Dumper.dumpInfo(app); app = rebind(); child = Iterables.getOnlyElement(app.getChildren()); + // is run again when feed restarts (but could weaken) + EntityAsserts.assertAttributeEqualsEventually(child, Sensors.newSensor(Object.class, "count"), 3); + child.sensors().set(Sensors.newIntegerSensor("trig"), 2); EntityAsserts.assertAttributeEqualsEventually(child, Sensors.newSensor(Object.class, "myWorkflowSensor"), MutableMap.of("n", 2)); + EntityAsserts.assertAttributeEquals(child, Sensors.newSensor(Object.class, "count"), 4); + + tt = BrooklynTaskTags.getTasksInAdjunctContext(mgmt().getExecutionManager(), Iterables.getOnlyElement(((EntityInternal) child).feeds().getFeeds())); + Asserts.assertThat(tt, ts -> ts.stream().anyMatch(ti -> ti.getDisplayName().contains("Workflow for sensor"))); } } diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java index 597c22bbf0..a1dc3fc333 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java @@ -68,6 +68,7 @@ import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -154,6 +155,13 @@ public class WorkflowYamlTest extends AbstractYamlTest { doTestWorkflowSensor("triggers: theTrigger", Duration.seconds(1)::isLongerThan); } + @Test(groups="Integration") // because delay + public void testWorkflowSensorTriggerDoesntRunTooMuch() throws Exception { + Entity entity = doTestWorkflowSensor("triggers: [ theTrigger, anotherTrigger ]", Duration.seconds(1)::isLongerThan); + Time.sleep(Duration.millis(500)); + EntityAsserts.assertAttributeEqualsEventually(entity, MY_WORKFLOW_SENSOR, MutableMap.of("foo", "bar", "v", 1)); + } + @Test(groups="Integration") // because delay public void testWorkflowSensorPeriod() throws Exception { doTestWorkflowSensor("period: 2s", Duration.seconds(2)::isShorterThan); @@ -189,7 +197,9 @@ public class WorkflowYamlTest extends AbstractYamlTest { doTestWorkflowPolicy("condition: { sensor: not_exist }\n" + "period: 200 ms", null); } - void doTestWorkflowSensor(String triggers, Predicate<Duration> timeCheckOrNullIfShouldFail) throws Exception { + static final AttributeSensor<Object> MY_WORKFLOW_SENSOR = Sensors.newSensor(Object.class, "myWorkflowSensor"); + + Entity doTestWorkflowSensor(String triggers, Predicate<Duration> timeCheckOrNullIfShouldFail) throws Exception { Entity app = createAndStartApplication( "services:", "- type: " + BasicEntity.class.getName(), @@ -216,17 +226,16 @@ public class WorkflowYamlTest extends AbstractYamlTest { Duration d1 = Duration.of(sw); Entity entity = Iterables.getOnlyElement(app.getChildren()); - AttributeSensor<Object> s = Sensors.newSensor(Object.class, "myWorkflowSensor"); if (timeCheckOrNullIfShouldFail!=null) { - EntityAsserts.assertAttributeEventuallyNonNull(entity, s); + EntityAsserts.assertAttributeEventuallyNonNull(entity, MY_WORKFLOW_SENSOR); Duration d2 = Duration.of(sw).subtract(d1); // initial set should be soon after startup Asserts.assertThat(d2, Duration.millis(500)::isLongerThan); - EntityAsserts.assertAttributeEqualsEventually(entity, s, MutableMap.of("foo", "bar", "v", 0)); + EntityAsserts.assertAttributeEqualsEventually(entity, MY_WORKFLOW_SENSOR, MutableMap.of("foo", "bar", "v", 0)); entity.sensors().set(Sensors.newStringSensor("theTrigger"), "go"); - EntityAsserts.assertAttributeEqualsEventually(entity, s, MutableMap.of("foo", "bar", "v", 1)); + EntityAsserts.assertAttributeEqualsEventually(entity, MY_WORKFLOW_SENSOR, MutableMap.of("foo", "bar", "v", 1)); Duration d3 = Duration.of(sw).subtract(d2); // the next iteration should obey the time constraint specified above if (!timeCheckOrNullIfShouldFail.test(d3)) Asserts.fail("Timing error, took " + d3); @@ -236,9 +245,10 @@ public class WorkflowYamlTest extends AbstractYamlTest { // step definitions should not be resolved by jackson defs.forEach(def -> Asserts.assertThat(def, d -> !(d instanceof WorkflowStepDefinition))); } else { - EntityAsserts.assertAttributeEqualsContinually(entity, s, null); + EntityAsserts.assertAttributeEqualsContinually(entity, MY_WORKFLOW_SENSOR, null); Asserts.assertThat(new WorkflowStatePersistenceViaSensors(mgmt()).getWorkflows(entity).values(), Collection::isEmpty); } + return entity; } public void doTestWorkflowPolicy(String triggers, Predicate<Duration> timeCheckOrNullIfShouldFail) throws Exception { @@ -274,24 +284,22 @@ public class WorkflowYamlTest extends AbstractYamlTest { // should really ID be settable from flag? Asserts.assertEquals(policy.getId(), "set-my-workflow-sensor"); - AttributeSensor<Object> s = Sensors.newSensor(Object.class, "myWorkflowSensor"); - if (timeCheckOrNullIfShouldFail!=null) { // EntityAsserts.assertAttributeEventuallyNonNull(entity, s); - EntityAsserts.assertAttributeEquals(entity, s, null); + EntityAsserts.assertAttributeEquals(entity, MY_WORKFLOW_SENSOR, null); Duration d2 = Duration.of(sw).subtract(d1); // initial set should be soon after startup Asserts.assertThat(d2, Duration.millis(500)::isLongerThan); // EntityAsserts.assertAttributeEqualsEventually(entity, s, MutableMap.of("foo", "bar", "v", 0)); entity.sensors().set(Sensors.newStringSensor("theTrigger"), "go"); - EntityAsserts.assertAttributeEqualsEventually(entity, s, MutableMap.of("foo", "bar", "v", 0)); + EntityAsserts.assertAttributeEqualsEventually(entity, MY_WORKFLOW_SENSOR, MutableMap.of("foo", "bar", "v", 0)); // EntityAsserts.assertAttributeEqualsEventually(entity, s, MutableMap.of("foo", "bar", "v", 1)); Duration d3 = Duration.of(sw).subtract(d2); // the next iteration should obey the time constraint specified above if (!timeCheckOrNullIfShouldFail.test(d3)) Asserts.fail("Timing error, took " + d3); } else { - EntityAsserts.assertAttributeEqualsContinually(entity, s, null); + EntityAsserts.assertAttributeEqualsContinually(entity, MY_WORKFLOW_SENSOR, null); } } diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java index 5ea02652b9..440228be9b 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -21,13 +21,12 @@ package org.apache.brooklyn.core.feed; import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.Callable; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import com.google.common.collect.Iterables; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; +import com.google.common.collect.*; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.SubscriptionHandle; import org.apache.brooklyn.api.mgmt.Task; @@ -36,13 +35,13 @@ import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.objs.AbstractEntityAdjunct; -import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.core.sensor.AbstractAddTriggerableSensor; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.predicates.DslPredicates; import org.apache.brooklyn.util.core.task.DynamicSequentialTask; +import org.apache.brooklyn.util.core.task.DynamicTasks; import org.apache.brooklyn.util.core.task.ScheduledTask; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -71,7 +70,7 @@ public class Poller<V> { private final Set<Callable<?>> oneOffJobs = new LinkedHashSet<Callable<?>>(); private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>(); private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>(); - private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>(); + private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<>(); private volatile boolean started = false; public <PI,PC extends PollConfig> void scheduleFeed(AbstractFeed feed, SetMultimap<PI,PC> polls, Function<PI,Callable<?>> jobFactory) { @@ -132,6 +131,7 @@ public class Poller<V> { private static class PollJob<V> { final PollHandler<? super V> handler; final Duration pollPeriod; + final Callable<?> job; final Runnable wrappedJob; final Entity pollTriggerEntity; final Sensor<?> pollTriggerSensor; @@ -149,6 +149,7 @@ public class Poller<V> { this.pollTriggerEntity = sensorSource; this.pollTriggerSensor = sensor; this.pollCondition = pollCondition; + this.job = job; wrappedJob = new Runnable() { @Override public void run() { @@ -157,7 +158,8 @@ public class Poller<V> { DslPredicates.DslPredicate pc = pollCondition.get(); if (pc!=null) { if (!pc.apply(BrooklynTaskTags.getContextEntity(Tasks.current()))) { - if (log.isTraceEnabled()) log.trace("PollJob for {} skipped because condition does not apply", job); + if (log.isTraceEnabled()) log.trace("Skipping execution for PollJob {} because condition does not apply", job); + log.debug("Skipping poll/feed execution because condition does not apply"); // log so we can see in log viewer return; } } @@ -231,33 +233,42 @@ public class Poller<V> { Duration minPeriod = null; Set<String> sensorSummaries = MutableSet.of(); - for (final PollJob<V> pollJob : pollJobs) { - final String scheduleName = MutableList.of(adjunct !=null ? adjunct.getDisplayName() : null, pollJob.handler.getDescription()) - .stream().filter(Strings::isNonBlank).collect(Collectors.joining("; ")); - boolean added = false; - Callable<Task<?>> tf = () -> { - DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), - () -> { - if (!Entities.isManagedActive(entity)) { - return null; - } - if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { - return null; - } - pollJob.wrappedJob.run(); + final Function<PollJob,String> scheduleNameFn = pollJob -> MutableList.of(adjunct !=null ? adjunct.getDisplayName() : null, pollJob.handler.getDescription()) + .stream().filter(Strings::isNonBlank).collect(Collectors.joining("; ")); + + BiFunction<Runnable,String,Task<?>> tf = (job, scheduleName) -> { + DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), + () -> { + if (!Entities.isManagedActive(entity)) { return null; - }); - // explicitly make non-transient -- we want to see its execution, even if parent is transient - BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG); - return task; - }; + } + if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { + return null; + } + job.run(); + return null; + }); + // explicitly make non-transient -- we want to see its execution, even if parent is transient + BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG); + return task; + }; + Multimap<Callable,PollJob> nonScheduledJobs = Multimaps.newSetMultimap(MutableMap.of(), MutableSet::of); + pollJobs.forEach(pollJob -> nonScheduledJobs.put(pollJob.job, pollJob)); - ScheduledTask.Builder tb = ScheduledTask.builder(tf) - .cancelOnException(false) - .tag(adjunct != null ? BrooklynTaskTags.tagForContextAdjunct(adjunct) : null); + // 'runInitially' could be an option on the job; currently we always do + // if it's a scheduled task, that happens automatically; if it's a triggered task + // we collect the distinct runnables and run each of those + // (the poll job model doesn't work perfectly since usually all schedules/triggers are for the same job) + + for (final PollJob<V> pollJob : pollJobs) { + String scheduleName = scheduleNameFn.apply(pollJob); + boolean added = false; if (pollJob.pollPeriod!=null && pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) { + ScheduledTask.Builder tb = ScheduledTask.builder(() -> tf.apply(pollJob.wrappedJob, scheduleName)) + .cancelOnException(false) + .tag(adjunct != null ? BrooklynTaskTags.tagForContextAdjunct(adjunct) : null); added = true; tb.displayName("Periodic: " + scheduleName); tb.period(pollJob.pollPeriod); @@ -265,11 +276,12 @@ public class Poller<V> { if (minPeriod==null || (pollJob.pollPeriod.isShorterThan(minPeriod))) { minPeriod = pollJob.pollPeriod; } - } else { - // if no period, we simply need to run it initially - tb.displayName("Initial: "+scheduleName); + ScheduledTask st = tb.build(); + scheduledTasks.add(st); + log.debug("Submitting scheduled task "+st+" for poll/feed "+this+", job "+pollJob); + Entities.submit(entity, st); + nonScheduledJobs.removeAll(pollJob.job); } - tasks.add(Entities.submit(entity, tb.build())); if (pollJob.pollTriggerSensor !=null) { added = true; @@ -279,11 +291,12 @@ public class Poller<V> { } String summary = pollJob.pollTriggerSensor.getName(); if (pollJob.pollTriggerEntity!=null && !pollJob.pollTriggerEntity.equals(entity)) summary += " on "+pollJob.pollTriggerEntity; + log.debug("Adding subscription to "+summary+" for poll/feed "+this+", job "+pollJob); sensorSummaries.add(summary); pollJob.subscription = adjunct.subscriptions().subscribe(pollJob.pollTriggerEntity !=null ? pollJob.pollTriggerEntity : adjunct.getEntity(), pollJob.pollTriggerSensor, event -> { // submit this on every event try { - adjunct.getExecutionContext().submit(tf.call()); + adjunct.getExecutionContext().submit(tf.apply(pollJob.wrappedJob, scheduleName)); } catch (Exception e) { throw Exceptions.propagate(e); } @@ -291,9 +304,24 @@ public class Poller<V> { } if (!added) { - if (log.isDebugEnabled()) log.debug("Activating poll (as one-off, as no period and no subscriptions) for {} (using {})", new Object[] {entity, this}); + if (log.isDebugEnabled()) log.debug("Empty poll job "+pollJob+" in "+this+" for "+entity+"; if all jobs are empty (or trigger only), will add a trivial one-time initial task"); } } + + // no period for these, but we do need to run them initially, but combine if the Callable is the same (e.g. multiple triggers) + // not the PollJob is one per trigger, and the wrappedJob is specific to the poll job, but doesn't depend on the trigger, so we can just take the first + nonScheduledJobs.asMap().forEach( (jobC,jobP) -> { + Runnable job = jobP.iterator().next().wrappedJob; + String jobSummaries = jobP.stream().map(j -> j.handler.getDescription()).filter(Strings::isNonBlank).collect(Collectors.joining(", ")); + String name = (adjunct !=null ? adjunct.getDisplayName() : "anonymous")+(Strings.isNonBlank(jobSummaries) ? "; "+jobSummaries : ""); + Task<Object> t = Tasks.builder().dynamic(true).displayName("Initial: " +name) + .body( + () -> DynamicTasks.queue(tf.apply(job, name)).getUnchecked()) + .tag(adjunct != null ? BrooklynTaskTags.tagForContextAdjunct(adjunct) : null) + .build(); + log.debug("Submitting initial task "+t+" for poll/feed "+this+", job "+job+" (because otherwise is trigger-only)"); + Entities.submit(entity, t); + }); if (adjunct !=null) { if (sensorSummaries.isEmpty()) { @@ -326,7 +354,7 @@ public class Poller<V> { for (Task<?> task : oneOffTasks) { if (task != null) task.cancel(true); } - for (ScheduledTask task : tasks) { + for (ScheduledTask task : scheduledTasks) { if (task != null) task.cancel(); } for (PollJob<?> j: pollJobs) { @@ -336,12 +364,12 @@ public class Poller<V> { } } oneOffTasks.clear(); - tasks.clear(); + scheduledTasks.clear(); } public boolean isRunning() { boolean hasActiveTasks = false; - for (Task<?> task: tasks) { + for (Task<?> task: scheduledTasks) { if (task.isBegun() && !task.isDone()) { hasActiveTasks = true; break; @@ -349,7 +377,7 @@ public class Poller<V> { } boolean hasSubscriptions = pollJobs.stream().anyMatch(j -> j.subscription!=null); if (!started && hasActiveTasks) { - log.warn("Poller should not be running, but has active tasks, tasks: "+tasks); + log.warn("Poller should not be running, but has active tasks, tasks: "+ scheduledTasks); } if (!started && hasSubscriptions) { log.warn("Poller should not be running, but has subscriptions on jobs: "+pollJobs); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java index 423335dd18..f18ba372d1 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java @@ -194,7 +194,7 @@ public class BrooklynTaskTags extends TaskTags { * Tasks with this tag will also have a {@link #tagForContextEntity(Entity)}. */ public static WrappedObject<EntityAdjunct> tagForContextAdjunct(EntityAdjunct adjunct) { - return new WrappedObject<EntityAdjunct>(CONTEXT_ADJUNCT, adjunct); + return new WrappedObject<>(CONTEXT_ADJUNCT, adjunct); } @@ -235,6 +235,16 @@ public class BrooklynTaskTags extends TaskTags { public static Entity getContextEntity(Task<?> task) { return getWrappedEntityOfType(task, CONTEXT_ENTITY); } + public static EntityAdjunct getContextEntityAdjunct(Task<?> task, boolean recursively) { + WrappedObject<EntityAdjunct> result = getWrappedObjectTagOfType(getTagsFast(task), CONTEXT_ADJUNCT, EntityAdjunct.class); + if (result==null) { + if (recursively && task!=null) { + return getContextEntityAdjunct(task.getSubmittedByTask(), recursively); + } + return null; + } + return result.object; + } public static Object getTargetOrContextEntityTag(Task<?> task) { if (task == null) return null; diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindExceptionHandlerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindExceptionHandlerImpl.java index c29de36204..9a604651fc 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindExceptionHandlerImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindExceptionHandlerImpl.java @@ -273,6 +273,8 @@ public class RebindExceptionHandlerImpl implements RebindExceptionHandler { throw new RuntimeInterruptedException("Interruption discovered when recording dangling feed "+id); } + if (createAdjunctProxy!=null) return (Feed) createAdjunctProxy.apply(Feed.class, id); + missingFeeds.add(id); if (danglingRefFailureMode == RebindManager.RebindFailureMode.FAIL_FAST) { throw new IllegalStateException("No feed found with id "+id); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java index fe3ead758d..03d8018707 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java @@ -558,18 +558,6 @@ public abstract class RebindIteration { logRebindingDebug("Not rebinding enrichers; feature disabled: {}", memento.getEnricherIds()); } - if (!adjunctProxies.isEmpty()) { - LOG.warn("Adjunct proxies not empty, likely indicating dangling references: "+adjunctProxies); - adjunctProxies.entrySet().forEach(entry -> { - if (entry.getValue() instanceof Policy) exceptionHandler.onDanglingPolicyRef(entry.getKey()); - else if (entry.getValue() instanceof Enricher) exceptionHandler.onDanglingEnricherRef(entry.getKey()); - else { - LOG.warn("Adjunct proxy for "+entry.getKey()+" is of unexpected type; "+entry.getValue()+"; reporting as dangling of unknown type"); - exceptionHandler.onDanglingUntypedItemRef(entry.getKey()); - } - }); - adjunctProxies.clear(); - } // Instantiate feeds if (rebindManager.persistFeedsEnabled) { @@ -579,6 +567,7 @@ public abstract class RebindIteration { try { Feed feed = instantiator.newFeed(feedMemento); + EntityAdjunctProxyImpl.resetDelegate( adjunctProxies.remove(feed.getId()) , feed); rebindContext.registerFeed(feedMemento.getId(), feed); // started during associateAdjunctsWithEntities by RebindAdjuncts } catch (Exception e) { @@ -588,6 +577,20 @@ public abstract class RebindIteration { } else { logRebindingDebug("Not rebinding feeds; feature disabled: {}", memento.getFeedIds()); } + + if (!adjunctProxies.isEmpty()) { + LOG.warn("Adjunct proxies not empty, likely indicating dangling references: "+adjunctProxies); + adjunctProxies.entrySet().forEach(entry -> { + if (entry.getValue() instanceof Policy) exceptionHandler.onDanglingPolicyRef(entry.getKey()); + else if (entry.getValue() instanceof Enricher) exceptionHandler.onDanglingEnricherRef(entry.getKey()); + else { + LOG.warn("Adjunct proxy for "+entry.getKey()+" is of unexpected type; "+entry.getValue()+"; reporting as dangling of unknown type"); + exceptionHandler.onDanglingUntypedItemRef(entry.getKey()); + } + }); + adjunctProxies.clear(); + } + } protected void reconstructEverything() { diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java index 1656d3c07a..af39ae6638 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java @@ -62,7 +62,6 @@ public final class WorkflowPolicy<T> extends AbstractPolicy { // ? - do we need to have an option not to run when added? - public WorkflowPolicy() {} public WorkflowPolicy(Map<?,?> params) { super(params); @@ -123,7 +122,7 @@ public final class WorkflowPolicy<T> extends AbstractPolicy { Set<PollConfig> pollConfigs = MutableSet.of(pc); poller.schedulePoll(this, pollConfigs, new WorkflowSensor.WorkflowPollCallable( - getDisplayName() + " (workflow)", this, config().getBag()), new PolicyNoOpPollHandler()); + getDisplayName() + " (workflow)", config().getBag(), this), new PolicyNoOpPollHandler()); if (!isSuspended()) resume(); } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java index 70d9513530..fd12e4cdab 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.core.workflow; import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; import com.google.common.reflect.TypeToken; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityInitializer; @@ -41,6 +42,7 @@ import org.apache.brooklyn.feed.function.FunctionPollConfig; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; @@ -83,15 +85,15 @@ public final class WorkflowSensor<T> extends AbstractAddTriggerableSensor<T> imp } private void apply(final EntityLocal entity, final ConfigBag params) { - AttributeSensor<T> sensor = addSensor(entity); if (LOG.isDebugEnabled()) { LOG.debug("Adding workflow sensor {} to {}", sensor.getName(), entity); } + WorkflowPollCallable wc = new WorkflowPollCallable("Workflow for sensor " + sensor.getName(), params, null); FunctionPollConfig<Object,Object> pollConfig = new FunctionPollConfig<Object,T>(sensor) - .callable(new WorkflowPollCallable("Workflow for sensor " + sensor.getName(), entity, params)) + .callable(wc) .onSuccess(TypeCoercions.<T>function((Class)sensor.getTypeToken().getRawType())); standardPollConfig(entity, initParams(), pollConfig); @@ -103,8 +105,8 @@ public final class WorkflowSensor<T> extends AbstractAddTriggerableSensor<T> imp .poll(pollConfig); FunctionFeed feed = feedBuilder.build(); + wc.init(feed); entity.addFeed(feed); - } @Override @@ -145,18 +147,28 @@ public final class WorkflowSensor<T> extends AbstractAddTriggerableSensor<T> imp static class WorkflowPollCallable implements Callable<Object> { private final String workflowCallableName; - private final BrooklynObject entityOrAdjunct; - private final ConfigBag params; + private BrooklynObject entityOrAdjunct; + private final Map<String,Object> params; - public WorkflowPollCallable(String workflowCallableName, BrooklynObject entityOrAdjunct, ConfigBag params) { + protected WorkflowPollCallable(String workflowCallableName, ConfigBag params, BrooklynObject entityOrAdjunct) { this.workflowCallableName = workflowCallableName; + this.params = params.getAllConfigRaw(); + this.entityOrAdjunct = entityOrAdjunct; + } + + /** used in some places where the entity/adjunct is set late */ + public void init(BrooklynObject entityOrAdjunct) { this.entityOrAdjunct = entityOrAdjunct; - this.params = params; } @Override public Object call() throws Exception { - WorkflowExecutionContext wc = WorkflowExecutionContext.newInstancePersisted(entityOrAdjunct, workflowCallableName, params, null, null, null); + BrooklynObject entityOrAdjunct = this.entityOrAdjunct; + if (entityOrAdjunct==null) entityOrAdjunct = BrooklynTaskTags.getContextEntityAdjunct(Tasks.current(), false); + if (entityOrAdjunct==null) entityOrAdjunct = BrooklynTaskTags.getContextEntity(Tasks.current()); + if (entityOrAdjunct==null) throw new IllegalStateException("No entity adjunct or entity available for "+this); + + WorkflowExecutionContext wc = WorkflowExecutionContext.newInstancePersisted(entityOrAdjunct, workflowCallableName, ConfigBag.newInstance(params), null, null, null); Task<Object> wt = wc.getTask(false /* condition checked by poll config framework */).get(); if (entityOrAdjunct instanceof EntityAdjunct) { // add tag to each task so it shows up in list on mgmt context @@ -184,7 +196,13 @@ public final class WorkflowSensor<T> extends AbstractAddTriggerableSensor<T> imp } } - protected void onStart() {} - protected void onEnd() {} + @Override + public String toString() { + return "WorkflowPollCallable{" + + "workflowCallableName='" + workflowCallableName + '\'' + + ", context=" + entityOrAdjunct + + ", params=" + params + + '}'; + } } }
