This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push:
new 439ca84e58 allow to use workflow and expressions to generate the name
of dynamic multi groups
439ca84e58 is described below
commit 439ca84e58da38e8fe8f083a0fdb0747ca819de0
Author: Alex Heneveld <[email protected]>
AuthorDate: Thu Dec 1 22:01:10 2022 +0000
allow to use workflow and expressions to generate the name of dynamic multi
groups
---
.../brooklyn/entity/group/DynamicMultiGroup.java | 18 +++++++-
.../entity/group/DynamicMultiGroupImpl.java | 48 ++++++++++++++++++++--
.../entity/group/DynamicMultiGroupTest.java | 28 +++++++++++--
3 files changed, 85 insertions(+), 9 deletions(-)
diff --git
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
index 2ce91c960b..4410ac184a 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
@@ -28,6 +28,7 @@ import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import com.google.common.annotations.Beta;
@@ -49,7 +50,21 @@ public interface DynamicMultiGroup extends DynamicGroup {
ConfigKey<Function<Entity, String>> BUCKET_FUNCTION =
ConfigKeys.newConfigKey(
new TypeToken<Function<Entity, String>>(){},
"brooklyn.multigroup.bucketFunction",
- "Implements the mapping from entity to bucket (name)"
+ "Function to return the bucket (name) an entity should be placed
in"
+ );
+
+ @SetFromFlag("bucketWorkflow")
+ ConfigKey<CustomWorkflowStep> BUCKET_WORKFLOW = ConfigKeys.newConfigKey(
+ CustomWorkflowStep.class,
+ "brooklyn.multigroup.bucketWorkflow",
+ "Workflow to return the bucket (name) an entity should be placed
in"
+ );
+
+ @SetFromFlag("bucketExpression")
+ ConfigKey<String> BUCKET_EXPRESSION = ConfigKeys.newConfigKey(
+ String.class,
+ "brooklyn.multigroup.bucketExpression",
+ "Freemarker template expression to return the bucket (name) an
entity should be placed in"
);
@SetFromFlag("bucketIdFunction")
@@ -104,7 +119,6 @@ public interface DynamicMultiGroup extends DynamicGroup {
*
* @see #ENTITY_FILTER
* @see #BUCKET_FUNCTION
- * @see #GROUP_SPEC
*/
void distributeEntities();
diff --git
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
index 5f652a6039..7a4e7d674e 100644
---
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
+++
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
@@ -29,16 +29,23 @@ import javax.annotation.Nullable;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
import
org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
import org.apache.brooklyn.feed.function.FunctionFeed;
import org.apache.brooklyn.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,17 +233,50 @@ public class DynamicMultiGroupImpl extends
DynamicGroupImpl implements DynamicMu
synchronized (memberChangeMutex) {
if (Entities.isNoLongerManaged(this)) return;
- Function<Entity, String> bucketFunction =
getConfig(BUCKET_FUNCTION);
+ Function<Entity, String> bucketFunctionF =
getConfig(BUCKET_FUNCTION);
+ CustomWorkflowStep bucketFunctionW = getConfig(BUCKET_WORKFLOW);
+ String bucketFunctionE = getConfig(BUCKET_EXPRESSION);
+ if (bucketFunctionF == null && bucketFunctionW == null &&
bucketFunctionE == null) return;
+
+ if (bucketFunctionE != null) {
+ if (bucketFunctionW != null) LOG.warn("Ignoring bucket
workflow because expression supplied");
+ bucketFunctionW = TypeCoercions.coerce(MutableMap.of("steps",
MutableList.of("return "+bucketFunctionE)), CustomWorkflowStep.class);
+ }
+ if (bucketFunctionW != null) {
+ if (bucketFunctionF != null) LOG.warn("Ignoring bucket
function because workflow or expression supplied");
+ CustomWorkflowStep bucketFunctionW2 = bucketFunctionW;
+ bucketFunctionF = entity -> {
+ Maybe<Task<Object>> t =
bucketFunctionW2.newWorkflowExecution(entity, "Workflow to find bucket name for
" + entity, null).getTask(true);
+ if (t.isAbsent()) {
+ LOG.debug("Entity " + entity + " does not match
condition to placed in the dynamic multigroup");
+ return null;
+ }
+ try {
+ return
TypeCoercions.coerce(DynamicTasks.submit(t.get(), entity).getUnchecked(),
String.class);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ LOG.warn("Entity " + entity + " failed when trying to
evaluate the bucket it goes in; not putting into a bucket");
+ return null;
+ }
+ };
+ }
+ if (bucketFunctionF == null) {
+ LOG.warn(this+" should have exactly one of: a bucket
expression, workflow, or function");
+ return;
+ }
+
Function<Entity, String> bucketIdFunction =
getConfig(BUCKET_ID_FUNCTION);
EntitySpec<? extends BasicGroup> bucketSpec =
getConfig(BUCKET_SPEC);
- if (bucketFunction == null || bucketSpec == null) return;
+ if (bucketSpec == null) return;
Map<String, BasicGroup> buckets =
MutableMap.copyOf(getAttribute(BUCKETS));
// Bucketize the members where the function gives a non-null bucket
- Multimap<String, Entity> entityMapping = Multimaps.index(
- Iterables.filter(getMembers(),
Predicates.compose(Predicates.notNull(), bucketFunction)), bucketFunction);
+ Function<Entity, String> bucketFunctionF2 = bucketFunctionF;
+ Multimap<String, Entity> entityMapping =
getMembers().stream().collect(() -> Multimaps.newSetMultimap(MutableMap.of(),
MutableSet::new),
+ (map,entity) -> { String name =
bucketFunctionF2.apply(entity); if (Strings.isNonBlank(name)) map.put(name,
entity); },
+ (m1,m2) -> m1.putAll(m2));
// Now fill the buckets
Collection<Entity> oldChildren = getChildren();
diff --git
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
index f7ae52f922..dd6e30eea9 100644
---
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
+++
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
@@ -22,8 +22,7 @@ import static com.google.common.base.Predicates.instanceOf;
import static com.google.common.collect.Iterables.find;
import static
org.apache.brooklyn.core.entity.EntityPredicates.displayNameEqualTo;
import static org.apache.brooklyn.entity.group.DynamicGroup.ENTITY_FILTER;
-import static
org.apache.brooklyn.entity.group.DynamicMultiGroup.BUCKET_FUNCTION;
-import static
org.apache.brooklyn.entity.group.DynamicMultiGroup.RESCAN_INTERVAL;
+import static org.apache.brooklyn.entity.group.DynamicMultiGroup.*;
import static
org.apache.brooklyn.entity.group.DynamicMultiGroupImpl.bucketFromAttribute;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -38,7 +37,13 @@ import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.core.workflow.WorkflowBasicTest;
+import org.apache.brooklyn.core.workflow.steps.CustomWorkflowStep;
import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableSet;
@@ -49,12 +54,29 @@ public class DynamicMultiGroupTest extends
BrooklynAppUnitTestSupport {
@Test
public void testBucketDistributionFromSubscription() {
+
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_FUNCTION,
bucketFromAttribute(SENSOR)));
+ }
+
+ @Test
+ public void testBucketDistributionFromSubscriptionWithWorkflow() {
+ WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_WORKFLOW,
+ TypeCoercions.coerce(MutableMap.of("steps",
MutableList.of("let x = ${entity.sensor['"+SENSOR.getName()+"']} ?? \"\"",
"return ${x}")), CustomWorkflowStep.class) ));
+ }
+
+ @Test
+ public void testBucketDistributionFromSubscriptionWithWorkflowExpression()
{
+ WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_EXPRESSION,
"${entity.sensor['"+SENSOR.getName()+"']}"));
+ }
+
+ void doTestBucketDistributionFromSubscription(ConfigBag config) {
Group group =
app.createAndManageChild(EntitySpec.create(BasicGroup.class));
final DynamicMultiGroup dmg = app.createAndManageChild(
EntitySpec.create(DynamicMultiGroup.class)
.configure(ENTITY_FILTER, instanceOf(TestEntity.class))
- .configure(BUCKET_FUNCTION,
bucketFromAttribute(SENSOR))
.configure(DynamicMultiGroup.BUCKET_ID_FUNCTION,
bucketFromAttribute(SENSOR))
+ .configure(config.getAllConfig())
);
app.subscriptions().subscribeToChildren(group, SENSOR, new
SensorEventListener<String>() {
@Override