This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 25f92a0b95828b3866ed23b024a90f91dcce44de Author: Alex Heneveld <[email protected]> AuthorDate: Fri Dec 2 10:09:51 2022 +0000 support a workflow initializer --- .../BrooklynCampPlatformLauncherAbstract.java | 7 +- .../brooklyn/camp/brooklyn/WorkflowYamlTest.java | 74 ++++++++++++ .../internal/NonDeploymentManagementContext.java | 3 +- .../core/workflow/WorkflowInitializer.java | 132 +++++++++++++++++++++ .../brooklyn/core/workflow/WorkflowBasicTest.java | 2 + karaf/init/src/main/resources/catalog.bom | 5 + 6 files changed, 221 insertions(+), 2 deletions(-) diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/BrooklynCampPlatformLauncherAbstract.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/BrooklynCampPlatformLauncherAbstract.java index 1fd05804f1..3a91f5498d 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/BrooklynCampPlatformLauncherAbstract.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/BrooklynCampPlatformLauncherAbstract.java @@ -51,10 +51,15 @@ public abstract class BrooklynCampPlatformLauncherAbstract { PlatformRootSummary.builder().name("Brooklyn CAMP Platform").build(), getBrooklynMgmt()) .setConfigKeyAtManagmentContext(); - + + markStartupComplete(); return this; } + protected void markStartupComplete() { + ((LocalManagementContext)mgmt).noteStartupComplete(); + } + protected LocalManagementContext newMgmtContext() { return new LocalManagementContext(); } diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java index cf0797c29d..68d0d6cff5 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/WorkflowYamlTest.java @@ -807,4 +807,78 @@ public class WorkflowYamlTest extends AbstractYamlTest { Asserts.assertEquals(invocation.getUnchecked().toString().trim(), "Y is Z"); } + @Test + public void testInitializer() throws Exception { + Entity app = createAndStartApplication( + "services:", + "- type: " + BasicEntity.class.getName(), + " brooklyn.initializers:", + " - type: workflow-initializer", + " brooklyn.config:", + " name: myWorkflow", + " steps:", + " - set-sensor boolean initializer_ran = true"); + waitForApplicationTasks(app); + Entity entity = Iterables.getOnlyElement(app.getChildren()); + EntityAsserts.assertAttributeEquals(entity, Sensors.newSensor(Object.class, "initializer_ran"), true); + } + + @Test + public void testInitializerDelay() throws Exception { + Entity app = createAndStartApplication( + "services:", + "- type: " + BasicEntity.class.getName(), + " brooklyn.initializers:", + " - type: workflow-initializer", + " brooklyn.config:", + " name: post-init", + " delay: async", + " steps:", + " - let x = ${entity.sensor.x} * 2", + " - set-sensor x = ${x}", + + " - type: workflow-initializer", + " brooklyn.config:", + " name: pre-init", + " steps:", + " - set-sensor integer x = 3"); + waitForApplicationTasks(app); + Entity entity = Iterables.getOnlyElement(app.getChildren()); + EntityAsserts.assertAttributeEquals(entity, Sensors.newIntegerSensor("x"), 6); + } + + @Test(groups="Integration") //because of 500ms delay + public void testInitializerDelayDuration() throws Exception { + Entity app = createAndStartApplication( + "services:", + "- type: " + BasicEntity.class.getName(), + " brooklyn.initializers:", + + " - type: workflow-initializer", + " brooklyn.config:", + " name: post-init-2", + " delay: 500ms", + " steps:", + " - let x = ${entity.sensor.x} + 1", // will cause 7 if runs after the other post-init (desired); problems: 8 if before, and 4 or 6 if they race + " - set-sensor x = ${x}", + + " - type: workflow-initializer", + " brooklyn.config:", + " name: post-init", + " delay: async", + " steps:", + " - let x = ${entity.sensor.x} * 2", + " - set-sensor x = ${x}", + + " - type: workflow-initializer", + " brooklyn.config:", + " name: pre-init", + " steps:", + " - set-sensor integer x = 3"); + waitForApplicationTasks(app); + + Entity entity = Iterables.getOnlyElement(app.getChildren()); + EntityAsserts.assertAttributeEqualsEventually(entity, Sensors.newIntegerSensor("x"), 7); + } + } diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java index a23bc04a14..bf3acc6c80 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/NonDeploymentManagementContext.java @@ -181,7 +181,8 @@ public class NonDeploymentManagementContext implements ManagementContextInternal } @Override public void waitForManagementStartupComplete(Duration timeout) { - throw new IllegalStateException("Cannot wait for startup on a non-deployment context"); + checkInitialManagementContextReal(); + initialManagementContext.waitForManagementStartupComplete(timeout); } @Override diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java new file mode 100644 index 0000000000..5c1c82b0b0 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.workflow; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityInitializers; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; +import org.apache.brooklyn.core.mgmt.EntityManagementUtils; +import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext; +import org.apache.brooklyn.util.collections.MutableList; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.text.Strings; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.Callable; + +public class WorkflowInitializer extends EntityInitializers.InitializerPatternWithConfigKeys implements WorkflowCommonConfig { + + public static final Logger log = LoggerFactory.getLogger(WorkflowInitializer.class); + + public static final ConfigKey<String> WORKFLOW_NAME = ConfigKeys.newStringConfigKey("name", "Name of the workflow to run as part of entity initialization", "Workflow initializer"); + public static final ConfigKey<Object> DELAY = ConfigKeys.newConfigKey(Object.class, "delay", "Either false to run synchronously during entity initialization (the default), " + + "true to run after management is fully started, or a duration to delay that long before running"); + + private EntityLocal entity; + + public WorkflowInitializer() {} + public WorkflowInitializer(ConfigBag params) { super(params); } + public WorkflowInitializer(Map<?, ?> params) { + this(ConfigBag.newInstance(params)); + } + + @Override + public void apply(EntityLocal entity) { + this.entity = entity; + + Object delay = initParams().get(DELAY); + boolean delayed; + Duration delayDuration; + if (delay==null || Boolean.FALSE.equals(delay) || (delay instanceof String && (Strings.isBlank((String) delay) || "false".equalsIgnoreCase((String) delay) || "sync".equalsIgnoreCase((String) delay)))) { + delayed = false; + delayDuration = null; + } else { + delayed = true; + if (Boolean.TRUE.equals(delay) || (delay instanceof String && ("true".equalsIgnoreCase((String) delay) || "async".equalsIgnoreCase((String) delay)))) { + delayDuration = Duration.ZERO; + } else { + delayDuration = Duration.parse(delay.toString()); + } + } + String delaySummary = delayed ? ((delayDuration.isPositive() ? ""+delayDuration+" " : "") + + "after management start") : null; + + Callable<Object> callable = () -> { + try { + WorkflowExecutionContext w = WorkflowExecutionContext.newInstancePersisted(entity, WorkflowExecutionContext.WorkflowContextType.OTHER, + initParam(WORKFLOW_NAME) + (delayed ? " (" + delaySummary + ")" : ""), + ConfigBag.newInstanceCopying(initParams()), + null, null, MutableMap.of("tags", MutableList.of(BrooklynTaskTags.ENTITY_INITIALIZATION))); + + Maybe<Task<Object>> task = w.getTask(true); + + if (task.isAbsent()) { + log.debug("Skipping workflow initializer on " + entity + ", condition not met: " + initParams()); + if (delayed) DynamicTasks.queue(Tasks.warning("Skipping: condition not met", null)); + return null; + + } else { + log.debug("Submitting workflow initializer on " + entity + ": " + initParams()); + w.persist(); + if (delayed) { + DynamicTasks.queue(Tasks.create("Delaying until " + delaySummary, () -> { + ((EntityInternal) entity).getManagementContext().waitForManagementStartupComplete(null); + if (delayDuration.isPositive()) Time.sleep(delayDuration); + })); + } + Task<Object> submitted = delayed ? DynamicTasks.queue(task.get()) : Entities.submit(entity, task.get()); + if (delayed) DynamicTasks.waitForLast(); + Object result = submitted.getUnchecked(); + log.debug("Applied workflow initializer on " + entity + ", result: " + result); + return result; + } + } catch (Exception e) { + log.warn("Error running workflow initializer (rethrowing): "+e, e); + throw Exceptions.propagate(e); + } + }; + + if (delayed) { + Entities.submit(entity, Tasks.builder().displayName(initParam(WORKFLOW_NAME)).dynamic(true).body(callable).build()); + + } else { + try { + callable.call(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + + } + +} diff --git a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java index f9e41d9da7..771472ad7f 100644 --- a/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/workflow/WorkflowBasicTest.java @@ -21,6 +21,7 @@ package org.apache.brooklyn.core.workflow; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.reflect.TypeToken; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityInitializer; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.mgmt.ManagementContext; @@ -127,6 +128,7 @@ public class WorkflowBasicTest extends BrooklynMgmtUnitTestSupport { addRegisteredTypeBean(mgmt, "workflow-effector", WorkflowEffector.class); addRegisteredTypeBean(mgmt, "workflow-sensor", WorkflowSensor.class); addRegisteredTypeSpec(mgmt, "workflow-policy", WorkflowPolicy.class, Policy.class); + addRegisteredTypeBean(mgmt, "workflow-initializer", WorkflowInitializer.class); } public static WorkflowExecutionContext runWorkflow(Entity target, String workflowYaml, String defaultName) { diff --git a/karaf/init/src/main/resources/catalog.bom b/karaf/init/src/main/resources/catalog.bom index 5ec98ea9d6..3f0ccfe386 100644 --- a/karaf/init/src/main/resources/catalog.bom +++ b/karaf/init/src/main/resources/catalog.bom @@ -241,6 +241,11 @@ brooklyn.catalog: type: org.apache.brooklyn.location.winrm.WinrmWorkflowStep # workflow initializers + - id: workflow-initializer + format: java-type-name + itemType: bean + item: + type: org.apache.brooklyn.core.workflow.WorkflowInitializer - id: workflow-effector format: java-type-name itemType: bean
