This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 99a06824ec608bfa5238bdf2fb9f83281c49fe9d Author: Alex Heneveld <[email protected]> AuthorDate: Thu Nov 10 11:50:14 2022 +0000 better handling of complex args, reporting of tasks/workflow-policy --- .../brooklyn/camp/brooklyn/WorkflowYamlTest.java | 68 +++++++++++++++++++++- .../java/org/apache/brooklyn/core/feed/Poller.java | 15 +++-- .../brooklyn/core/objs/AbstractEntityAdjunct.java | 6 ++ .../workflow/WorkflowExpressionResolution.java | 3 +- .../brooklyn/core/workflow/WorkflowPolicy.java | 12 +++- .../brooklyn/core/workflow/WorkflowSensor.java | 26 ++++++++- .../util/core/json/ShellEnvironmentSerializer.java | 17 +++++- .../apache/brooklyn/util/core/task/BasicTask.java | 12 ++-- 8 files changed, 140 insertions(+), 19 deletions(-) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java index be6af46518..e043c93b03 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.Application; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.location.LocationSpec; import org.apache.brooklyn.api.location.MachineLocation; @@ -36,7 +37,6 @@ import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.typereg.RegisteredType; import org.apache.brooklyn.camp.brooklyn.spi.dsl.DslUtils; import org.apache.brooklyn.camp.brooklyn.spi.dsl.methods.BrooklynDslCommon; -import org.apache.brooklyn.camp.brooklyn.spi.dsl.parse.DslParser; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.*; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; @@ -52,6 +52,7 @@ import org.apache.brooklyn.core.workflow.steps.LogWorkflowStep; import org.apache.brooklyn.core.workflow.store.WorkflowStatePersistenceViaSensors; import org.apache.brooklyn.entity.software.base.EmptySoftwareProcess; import org.apache.brooklyn.entity.software.base.WorkflowSoftwareProcess; +import org.apache.brooklyn.entity.stock.BasicApplication; import org.apache.brooklyn.entity.stock.BasicEntity; import org.apache.brooklyn.location.byon.FixedListMachineProvisioningLocation; import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation; @@ -737,5 +738,70 @@ public class WorkflowYamlTest extends AbstractYamlTest { Asserts.assertEquals(output.toString().trim(), message + " Entity:"+app.getChildren().iterator().next().getId()+":"+"world:Arg1:{\"x\":\"Arg1\"}"); } + @Test + public void testEffectorArgDslInMap() { + BrooklynDslCommon.registerSerializationHooks(); + BasicApplication app = mgmt().getEntityManager().createEntity(EntitySpec.create(BasicApplication.class).configure("z", "Z")); + + WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance() + .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflowEffector3") + .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, MutableMap.of("x", MutableMap.of("type", "map"))) + .configure(WorkflowEffector.STEPS, MutableList.of("return ${x}"))); + eff.apply((EntityLocal)app); + eff = new WorkflowEffector(ConfigBag.newInstance() + .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflowEffector2") + .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, MutableMap.of("x", MutableMap.of())) + .configure(WorkflowEffector.STEPS, MutableList.of(MutableMap.of("step", "invoke-effector myWorkflowEffector3", + "args", MutableMap.of("x", "${x}"))))); + eff.apply((EntityLocal)app); + eff = new WorkflowEffector(ConfigBag.newInstance() + .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflowEffector1") + .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, MutableMap.of("x", MutableMap.of())) + .configure(WorkflowEffector.STEPS, MutableList.of(MutableMap.of("step", "invoke-effector myWorkflowEffector2", + "args", MutableMap.of("x", MutableMap.of("y", "$brooklyn:config(\"z\")")))))); + eff.apply((EntityLocal)app); + + Task<?> invocation = app.invoke(app.getEntityType().getEffectorByName("myWorkflowEffector1").get(), + MutableMap.of( +// "x", MutableMap.of("y", DslUtils.parseBrooklynDsl(mgmt(), "$brooklyn:config(\"z\")")) + )); + Asserts.assertEquals(invocation.getUnchecked(), MutableMap.of("y","Z")); + } + + @Test(groups="Live") + public void testEffectorSshEnvArgDslInMap() { + BrooklynDslCommon.registerSerializationHooks(); + TestApplication app = mgmt().getEntityManager().createEntity(EntitySpec.create(TestApplication.class).configure("z", "Z")); + + EmptySoftwareProcess child = app.createAndManageChild(EntitySpec.create(EmptySoftwareProcess.class).location(LocationSpec.create(LocalhostMachineProvisioningLocation.class))); + app.start(ImmutableList.of()); + + WorkflowEffector eff = new WorkflowEffector(ConfigBag.newInstance() + .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflowEffector3") + .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, MutableMap.of("script", MutableMap.of(), "env", MutableMap.of("defaultValue", MutableMap.of()))) + .configure(WorkflowEffector.STEPS, MutableList.of( + MutableMap.of("type", "ssh", + "command", "bash -c \"${script}\"", + "env", "${env}"), + "return ${stdout}"))); + eff.apply((EntityLocal)child); + eff = new WorkflowEffector(ConfigBag.newInstance() + .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflowEffector2") + .configure(WorkflowEffector.EFFECTOR_PARAMETER_DEFS, MutableMap.of("script", MutableMap.of(), "env", MutableMap.of("defaultValue", MutableMap.of()))) + .configure(WorkflowEffector.STEPS, MutableList.of(MutableMap.of("step", "invoke-effector myWorkflowEffector3", + "args", MutableMap.of("script", "${scriot}", "env", "${env}"))))); + eff.apply((EntityLocal)child); + eff = new WorkflowEffector(ConfigBag.newInstance() + .configure(WorkflowEffector.EFFECTOR_NAME, "myWorkflowEffector1") + .configure(WorkflowEffector.STEPS, MutableList.of(MutableMap.of("step", "invoke-effector myWorkflowEffector2", + "args", MutableMap.of("script", "echo Y is $Y", "env", MutableMap.of("Y", "$brooklyn:config(\"z\")")))))); + eff.apply((EntityLocal)child); + + Task<?> invocation = child.invoke(child.getEntityType().getEffectorByName("myWorkflowEffector1").get(), + MutableMap.of( +// "x", MutableMap.of("y", DslUtils.parseBrooklynDsl(mgmt(), "$brooklyn:config(\"z\")")) + )); + Asserts.assertEquals(invocation.getUnchecked().toString().trim(), "Y is Z"); + } } diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java index e8b4b117aa..317a4e7881 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -45,6 +45,7 @@ import org.apache.brooklyn.util.core.task.DynamicSequentialTask; import org.apache.brooklyn.util.core.task.ScheduledTask; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; @@ -228,7 +229,7 @@ public class Poller<V> { } Duration minPeriod = null; - Set<String> sensors = MutableSet.of(); + Set<String> sensorSummaries = MutableSet.of(); for (final PollJob<V> pollJob : pollJobs) { final String scheduleName = (adjunct !=null ? adjunct.getDisplayName()+", " : "") +pollJob.handler.getDescription(); boolean added = false; @@ -275,7 +276,9 @@ public class Poller<V> { throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already has subscription %s", this, entity, pollJob.subscription)); } - sensors.add(pollJob.pollTriggerSensor.getName()); + String summary = pollJob.pollTriggerSensor.getName(); + if (pollJob.pollTriggerEntity!=null && !pollJob.pollTriggerEntity.equals(entity)) summary += " on "+pollJob.pollTriggerEntity; + sensorSummaries.add(summary); pollJob.subscription = adjunct.subscriptions().subscribe(pollJob.pollTriggerEntity !=null ? pollJob.pollTriggerEntity : adjunct.getEntity(), pollJob.pollTriggerSensor, event -> { // submit this on every event try { @@ -292,17 +295,17 @@ public class Poller<V> { } if (adjunct !=null) { - if (sensors.isEmpty()) { - if (minPeriod==null) { + if (sensorSummaries.isEmpty()) { + if (minPeriod==null || minPeriod.equals(Duration.PRACTICALLY_FOREVER) || !minPeriod.isPositive()) { adjunct.highlightTriggers("Not configured with a period or triggers"); } else { highlightTriggerPeriod(minPeriod); } } else if (minPeriod==null) { - adjunct.highlightTriggers("Triggered by: "+sensors); + adjunct.highlightTriggers("Triggered by: "+ Strings.join(sensorSummaries, "; ")); } else { // both - adjunct.highlightTriggers("Running every "+minPeriod+" and on triggers: "+sensors); + adjunct.highlightTriggers("Running every "+minPeriod+" and on triggers: "+Strings.join(sensorSummaries, "; ")); } } } diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java index 105e68edc6..ba6cda9290 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.entity.Group; @@ -389,6 +390,11 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple @Override public String getDisplayName() { if (name!=null && name.length()>0) return name; + return getDefaultDisplayName(); + } + + @JsonIgnore + protected String getDefaultDisplayName() { return getClass().getCanonicalName(); } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java index 7a623396dd..55e4e963f3 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java @@ -476,7 +476,8 @@ public class WorkflowExpressionResolution { if (useWrappedValue) { return new WrappedResolvedExpression<Object>(expression, result); } else { - // we try, but don't guarantee, that DSL expressions aren't re-resolved, ie $brooklyn:literal("$brooklyn:literal(\"x\")") won't return x + // we try, but don't guarantee, that DSL expressions aren't re-resolved, ie $brooklyn:literal("$brooklyn:literal(\"x\")") won't return x; + // this block will result = processDslComponents(result); } } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java index f08485e634..8ba24851a8 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowPolicy.java @@ -99,8 +99,13 @@ public final class WorkflowPolicy<T> extends AbstractPolicy { } public String getDescription() { - // TODO - return "workflow policy"; + // more info? customizable? + return "Policy to run a workflow on an event"; + } + + @Override + protected String getDefaultDisplayName() { + return "Workflow policy"; } @Override @@ -117,7 +122,8 @@ public final class WorkflowPolicy<T> extends AbstractPolicy { .condition(new ConditionSupplierFromAdjunct()); Set<PollConfig> pollConfigs = MutableSet.of(pc); - poller.schedulePoll(this, pollConfigs, new WorkflowSensor.WorkflowPollCallable("Workflow for policy "+this, this, config().getBag()), new PolicyNoOpPollHandler()); + poller.schedulePoll(this, pollConfigs, new WorkflowSensor.WorkflowPollCallable( + getDisplayName() + " (workflow)", this, config().getBag()), new PolicyNoOpPollHandler()); if (!isSuspended()) resume(); } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java index d70e0d5f26..6521e494d4 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowSensor.java @@ -39,6 +39,7 @@ import org.apache.brooklyn.feed.function.FunctionPollConfig; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,8 +155,29 @@ public final class WorkflowSensor<T> extends AbstractAddTriggerableSensor<T> imp @Override public Object call() throws Exception { WorkflowExecutionContext wc = WorkflowExecutionContext.newInstancePersisted(entityOrAdjunct, workflowCallableName, params, null, null, null); - Maybe<Task<Object>> wt = wc.getTask(false /* condition checked by poll config framework */); - return DynamicTasks.queue(wt.get()).getUnchecked(); + Task<Object> wt = wc.getTask(false /* condition checked by poll config framework */).get(); + if (entityOrAdjunct instanceof WorkflowPolicy) { + ((WorkflowPolicy)entityOrAdjunct).highlightAction("Workflow running", wt); + } + try { + Object result = DynamicTasks.queue(wt).getUnchecked(); + + if (entityOrAdjunct instanceof WorkflowPolicy) { + ((WorkflowPolicy)entityOrAdjunct).highlightAction("Workflow run (success)", wt); + } + + return result; + } catch (Exception e) { + if (entityOrAdjunct instanceof WorkflowPolicy) { + ((WorkflowPolicy)entityOrAdjunct).highlightAction("Workflow run, with error: "+ + Exceptions.collapseText(e), wt); + } + + throw e; + } } + + protected void onStart() {} + protected void onEnd() {} } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/json/ShellEnvironmentSerializer.java b/core/src/main/java/org/apache/brooklyn/util/core/json/ShellEnvironmentSerializer.java index c42e6dbc24..7a2ce38f5d 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/json/ShellEnvironmentSerializer.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/json/ShellEnvironmentSerializer.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; import org.apache.brooklyn.api.mgmt.ManagementContext; +import org.apache.brooklyn.util.core.task.DeferredSupplier; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.text.StringEscapes; import org.apache.commons.lang3.StringUtils; @@ -26,18 +27,32 @@ import org.apache.commons.lang3.StringUtils; import javax.annotation.Nullable; import java.util.Map; import java.util.Map.Entry; +import java.util.function.Function; public class ShellEnvironmentSerializer { private ObjectMapper mapper; - + private final Function<Object, Object> resolver; + public ShellEnvironmentSerializer(ManagementContext mgmt) { + this(mgmt, null); + } + public ShellEnvironmentSerializer(ManagementContext mgmt, Function<Object,Object> resolver) { mapper = BrooklynObjectsJsonMapper.newMapper(mgmt); + this.resolver = resolver; } public String serialize(Object value) { if (value == null) return null; if (value instanceof String) return (String)value; try { + if (value instanceof DeferredSupplier) { + if (resolver!=null) { + value = resolver.apply(value); + } else { + // could warn, because this probably isn't intended, but it might be + // throw new IllegalStateException("Cannot pass deferred suppliers to shell environment without a resolve function."); + } + } String str = mapper.writeValueAsString(value); if (isJsonString(str)) { // previously (2022-06) we would just write value.toString() in this block; but some things are serialized more nicely than toString, so prefer that format diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java index 625bf89fbb..485238d9c8 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicTask.java @@ -737,11 +737,13 @@ public class BasicTask<T> implements TaskInternal<T> { msg += "("+ti.getThreadState()+") on "+lookup(lock); } } - if (data.hasBlockingDetails) { - // if already has blocking details include this with lower priority - data.multiLineData.add(msg); - } else { - data.oneLineData.add(", "+Strings.toInitialLowerCase(msg)); + if (msg!=null) { + if (data.hasBlockingDetails) { + // if already has blocking details include this with lower priority + data.multiLineData.add(msg); + } else { + data.oneLineData.add(", " + Strings.toInitialLowerCase(msg)); + } } data.hasBlockingDetails = true; }
