This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push:
new c2d1ea600c fix some persistence issues with Dynamic MultiGroup and
PropagateToMembers
c2d1ea600c is described below
commit c2d1ea600c1c7758e03f8499d85c22890fb479ff
Author: Alex Heneveld <[email protected]>
AuthorDate: Fri Dec 2 13:13:21 2022 +0000
fix some persistence issues with Dynamic MultiGroup and PropagateToMembers
---
.../core/mgmt/internal/LocalManagementContext.java | 4 +-
.../core/objs/proxy/InternalEntityFactory.java | 3 +-
.../enricher/stock/PropagateToMembers.java | 29 ++-
.../brooklyn/entity/group/DynamicMultiGroup.java | 14 +-
.../entity/group/DynamicMultiGroupImpl.java | 217 ++++++++++++---------
.../stock/PropagateToMembersRebindTest.java | 121 ++++++++++++
.../enricher/stock/PropagateToMembersTest.java | 2 +-
.../entity/group/DynamicMultiGroupRebindTest.java | 41 +++-
.../entity/group/DynamicMultiGroupTest.java | 13 +-
9 files changed, 323 insertions(+), 121 deletions(-)
diff --git
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
index 01d45bfb90..200bde3580 100644
---
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
+++
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
@@ -246,7 +246,9 @@ public class LocalManagementContext extends
AbstractManagementContext {
@Override
public synchronized LocalEntityManager getEntityManager() {
- if (!isRunning()) throw new IllegalStateException("Management context
no longer running");
+ if (!isRunning()) {
+ throw new IllegalStateException("Management context no longer
running");
+ }
if (entityManager == null) {
entityManager = new LocalEntityManager(this);
diff --git
a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
index 7c2e0721bc..393c99cfb8 100644
---
a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
+++
b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
@@ -201,7 +201,8 @@ public class InternalEntityFactory extends InternalFactory {
Exceptions.propagateIfFatal(e);
log.info("Failed to initialise entity " + entity + " and its
descendants - unmanaging and propagating original exception: " +
Exceptions.collapseText(e));
try {
- ((EntityManagerInternal)
managementContext.getEntityManager()).discardPremanaged(entity);
+ if (managementContext.isRunning())
+ ((EntityManagerInternal)
managementContext.getEntityManager()).discardPremanaged(entity);
} catch (Exception e2) {
Exceptions.propagateIfFatal(e2);
log.info("Failed to unmanage entity " + entity + " and its
descendants, after failure to initialise (rethrowing original exception)", e2);
diff --git
a/core/src/main/java/org/apache/brooklyn/enricher/stock/PropagateToMembers.java
b/core/src/main/java/org/apache/brooklyn/enricher/stock/PropagateToMembers.java
index 31df91134c..2b9f325e85 100644
---
a/core/src/main/java/org/apache/brooklyn/enricher/stock/PropagateToMembers.java
+++
b/core/src/main/java/org/apache/brooklyn/enricher/stock/PropagateToMembers.java
@@ -21,6 +21,7 @@ package org.apache.brooklyn.enricher.stock;
import com.google.common.base.Predicates;
import com.google.common.reflect.TypeToken;
import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntityLocal;
import org.apache.brooklyn.api.entity.Group;
import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -31,6 +32,7 @@ import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.enricher.AbstractEnricher;
import org.apache.brooklyn.core.entity.trait.Changeable;
+import org.apache.brooklyn.util.collections.CollectionFunctionals;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +53,7 @@ public class PropagateToMembers extends AbstractEnricher
implements SensorEventL
public static final ConfigKey<Collection<? extends Sensor<?>>> PROPAGATING
= ConfigKeys.builder(new TypeToken<Collection<? extends Sensor<?>>>() {})
.name("enricher.propagating.inclusions")
.description("Collection of sensors to propagate to members")
- .constraint(Predicates.and(Objects::nonNull, sensors ->
!sensors.isEmpty()))
+ .constraint(Predicates.and(Predicates.notNull(),
CollectionFunctionals.notEmpty()))
.build();
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -63,19 +65,30 @@ public class PropagateToMembers extends AbstractEnricher
implements SensorEventL
throw new IllegalStateException("Only valid for groups");
}
- subscriptions().subscribe(entity, Changeable.MEMBER_ADDED, event -> {
- LOG.debug("Propagating '{}' to a new member '{}' ",
getConfig(PROPAGATING), event.getValue());
- getConfig(PROPAGATING).forEach(sensor -> {
+ subscriptions().subscribe(entity, Changeable.MEMBER_ADDED, new
PropagationSubscriber(this));
+
+ getConfig(PROPAGATING).forEach(sensor ->
subscriptions().subscribe(entity, sensor, this));
+ }
+
+ static class PropagationSubscriber implements SensorEventListener<Entity> {
+ private final PropagateToMembers adjunct;
+
+ public PropagationSubscriber(PropagateToMembers adjunct) {
+ this.adjunct = adjunct;
+ }
+
+ @Override
+ public void onEvent(SensorEvent<Entity> event) {
+ LOG.debug("Propagating '{}' to a new member '{}' ",
adjunct.getConfig(PROPAGATING), event.getValue());
+ adjunct.getConfig(PROPAGATING).forEach(sensor -> {
AttributeSensor attributeSensor = (AttributeSensor<?>)sensor;
- Object value = entity.getAttribute(attributeSensor);
+ Object value = adjunct.entity.getAttribute(attributeSensor);
if (!Objects.isNull(value)) {
LOG.debug("Propagating initial {}: {}", attributeSensor,
value);
event.getValue().sensors().set(attributeSensor, value);
}
});
- });
-
- getConfig(PROPAGATING).forEach(sensor ->
subscriptions().subscribe(entity, sensor, this));
+ }
}
@Override
diff --git
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
index 4410ac184a..fa77163f92 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
@@ -52,14 +52,12 @@ public interface DynamicMultiGroup extends DynamicGroup {
"brooklyn.multigroup.bucketFunction",
"Function to return the bucket (name) an entity should be placed
in"
);
-
@SetFromFlag("bucketWorkflow")
ConfigKey<CustomWorkflowStep> BUCKET_WORKFLOW = ConfigKeys.newConfigKey(
CustomWorkflowStep.class,
"brooklyn.multigroup.bucketWorkflow",
"Workflow to return the bucket (name) an entity should be placed
in"
);
-
@SetFromFlag("bucketExpression")
ConfigKey<String> BUCKET_EXPRESSION = ConfigKeys.newConfigKey(
String.class,
@@ -71,10 +69,18 @@ public interface DynamicMultiGroup extends DynamicGroup {
ConfigKey<Function<Entity, String>> BUCKET_ID_FUNCTION =
ConfigKeys.newConfigKey(
new TypeToken<Function<Entity, String>>(){},
"brooklyn.multigroup.bucketIdFunction",
- "Implements the mapping from entity to bucket ID; if supplied, the
ids of entities E1 and E2 should be the same if and only if the name returned
by bucketFunction are the same; " +
- "if not supplied, no ID is set"
+ "Used at bucket creation time to generate an ID for the bucket.
Should be unique if and only if the bucket (name) function is unique for two
entities. " +
+ "If not supplied, no ID is set."
);
+ ConfigKey<CustomWorkflowStep> BUCKET_ID_WORKFLOW = ConfigKeys.newConfigKey(
+ CustomWorkflowStep.class,
+ "brooklyn.multigroup.bucketIdWorkflow");
+
+ ConfigKey<String> BUCKET_ID_EXPRESSION = ConfigKeys.newConfigKey(
+ String.class,
+ "brooklyn.multigroup.bucketIdExpression");
+
/**
* Determines the type of {@link Group} used for the buckets.
*
diff --git
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
index 7a4e7d674e..82537ddf03 100644
---
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
+++
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
@@ -18,14 +18,9 @@
*/
package org.apache.brooklyn.entity.group;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.*;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.entity.Group;
@@ -50,15 +45,12 @@ import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
public class DynamicMultiGroupImpl extends DynamicGroupImpl implements
DynamicMultiGroup {
@@ -230,95 +222,132 @@ public class DynamicMultiGroupImpl extends
DynamicGroupImpl implements DynamicMu
@Override
public void distributeEntities() {
- synchronized (memberChangeMutex) {
- if (Entities.isNoLongerManaged(this)) return;
-
- Function<Entity, String> bucketFunctionF =
getConfig(BUCKET_FUNCTION);
- CustomWorkflowStep bucketFunctionW = getConfig(BUCKET_WORKFLOW);
- String bucketFunctionE = getConfig(BUCKET_EXPRESSION);
- if (bucketFunctionF == null && bucketFunctionW == null &&
bucketFunctionE == null) return;
-
- if (bucketFunctionE != null) {
- if (bucketFunctionW != null) LOG.warn("Ignoring bucket
workflow because expression supplied");
- bucketFunctionW = TypeCoercions.coerce(MutableMap.of("steps",
MutableList.of("return "+bucketFunctionE)), CustomWorkflowStep.class);
- }
- if (bucketFunctionW != null) {
- if (bucketFunctionF != null) LOG.warn("Ignoring bucket
function because workflow or expression supplied");
- CustomWorkflowStep bucketFunctionW2 = bucketFunctionW;
- bucketFunctionF = entity -> {
- Maybe<Task<Object>> t =
bucketFunctionW2.newWorkflowExecution(entity, "Workflow to find bucket name for
" + entity, null).getTask(true);
- if (t.isAbsent()) {
- LOG.debug("Entity " + entity + " does not match
condition to placed in the dynamic multigroup");
- return null;
- }
- try {
- return
TypeCoercions.coerce(DynamicTasks.submit(t.get(), entity).getUnchecked(),
String.class);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- LOG.warn("Entity " + entity + " failed when trying to
evaluate the bucket it goes in; not putting into a bucket");
- return null;
- }
- };
- }
- if (bucketFunctionF == null) {
- LOG.warn(this+" should have exactly one of: a bucket
expression, workflow, or function");
- return;
- }
+ try {
+ synchronized (memberChangeMutex) {
+ if (Entities.isUnmanagingOrNoLongerManaged(this)) return;
- Function<Entity, String> bucketIdFunction =
getConfig(BUCKET_ID_FUNCTION);
+ Function<Entity, String> bucketFunctionF =
getConfig(BUCKET_FUNCTION);
+ CustomWorkflowStep bucketFunctionW =
getConfig(BUCKET_WORKFLOW);
+ String bucketFunctionE = getConfig(BUCKET_EXPRESSION);
- EntitySpec<? extends BasicGroup> bucketSpec =
getConfig(BUCKET_SPEC);
- if (bucketSpec == null) return;
+ if (bucketFunctionE != null) {
+ if (bucketFunctionW != null) LOG.warn("Ignoring bucket
workflow because expression supplied");
+ bucketFunctionW =
TypeCoercions.coerce(MutableMap.of("steps", MutableList.of("return " +
bucketFunctionE)), CustomWorkflowStep.class);
+ }
+ if (bucketFunctionW != null) {
+ if (bucketFunctionF != null)
+ LOG.warn("Ignoring bucket function because workflow or
expression supplied");
+ bucketFunctionF = new WorkflowFunction(bucketFunctionW);
+ }
- Map<String, BasicGroup> buckets =
MutableMap.copyOf(getAttribute(BUCKETS));
+ Function<Entity, String> bucketIdFunctionF =
getConfig(BUCKET_ID_FUNCTION);
+ CustomWorkflowStep bucketIdFunctionW =
getConfig(BUCKET_ID_WORKFLOW);
+ String bucketIdFunctionE = getConfig(BUCKET_ID_EXPRESSION);
- // Bucketize the members where the function gives a non-null bucket
- Function<Entity, String> bucketFunctionF2 = bucketFunctionF;
- Multimap<String, Entity> entityMapping =
getMembers().stream().collect(() -> Multimaps.newSetMultimap(MutableMap.of(),
MutableSet::new),
- (map,entity) -> { String name =
bucketFunctionF2.apply(entity); if (Strings.isNonBlank(name)) map.put(name,
entity); },
- (m1,m2) -> m1.putAll(m2));
+ if (bucketIdFunctionE != null) {
+ if (bucketIdFunctionW != null) LOG.warn("Ignoring bucket
workflow because expression supplied");
+ bucketIdFunctionW =
TypeCoercions.coerce(MutableMap.of("steps", MutableList.of("return " +
bucketIdFunctionE)), CustomWorkflowStep.class);
+ }
+ if (bucketIdFunctionW != null) {
+ if (bucketIdFunctionF != null)
+ LOG.warn("Ignoring bucket function because workflow or
expression supplied");
+ bucketIdFunctionF = new
WorkflowFunction(bucketIdFunctionW);
+ }
- // Now fill the buckets
- Collection<Entity> oldChildren = getChildren();
- for (String name : entityMapping.keySet()) {
- BasicGroup bucket = buckets.get(name);
- if (bucket == null) {
- try {
- EntitySpec<? extends BasicGroup> spec =
EntitySpec.create(bucketSpec).displayName(name);
- if (bucketIdFunction!=null) {
- spec.configure(BrooklynConfigKeys.PLAN_ID,
bucketIdFunction.apply(entityMapping.get(name).iterator().next()));
- }
+ if (bucketFunctionF == null) {
+ if (bucketIdFunctionF!=null) {
+ bucketFunctionF = bucketIdFunctionF;
+ } else {
+ LOG.warn(this + " should have exactly one of: a bucket
expression, workflow, or function (optionally coming from the bucket ID
function)");
+ return;
+ }
+ }
- bucket = addChild(spec);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- ServiceProblemsLogic.updateProblemsIndicator(this,
"children", "Could not add child; removing all new children for now:
"+Exceptions.collapseText(e));
- // if we don't do this, they get added infinitely often
- MutableSet<Entity> newChildren =
MutableSet.copyOf(getChildren());
- newChildren.removeAll(oldChildren);
- for (Entity child: newChildren) {
- removeChild(child);
+ EntitySpec<? extends BasicGroup> bucketSpec =
getConfig(BUCKET_SPEC);
+ if (bucketSpec == null) return;
+
+ Map<String, BasicGroup> buckets =
MutableMap.copyOf(getAttribute(BUCKETS));
+
+ // Bucketize the members where the function gives a non-null
bucket
+ Function<Entity, String> bucketFunctionF2 = bucketFunctionF;
+ Multimap<String, Entity> entityMapping =
getMembers().stream().collect(() -> Multimaps.newSetMultimap(MutableMap.of(),
MutableSet::new),
+ (map, entity) -> {
+ String name = bucketFunctionF2.apply(entity);
+ if (Strings.isNonBlank(name)) map.put(name,
entity);
+ },
+ (m1, m2) -> m1.putAll(m2));
+
+ // Now fill the buckets
+ Collection<Entity> oldChildren = getChildren();
+ for (String name : entityMapping.keySet()) {
+ BasicGroup bucket = buckets.get(name);
+ if (bucket == null) {
+ try {
+ EntitySpec<? extends BasicGroup> spec =
EntitySpec.create(bucketSpec).displayName(name);
+ if (bucketIdFunctionF != null) {
+ spec.configure(BrooklynConfigKeys.PLAN_ID,
bucketIdFunctionF.apply(entityMapping.get(name).iterator().next()));
+ }
+
+ bucket = addChild(spec);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ ServiceProblemsLogic.updateProblemsIndicator(this,
"children", "Could not add child; removing all new children for now: " +
Exceptions.collapseText(e));
+ // if we don't do this, they get added infinitely
often
+ MutableSet<Entity> newChildren =
MutableSet.copyOf(getChildren());
+ newChildren.removeAll(oldChildren);
+ for (Entity child : newChildren) {
+ removeChild(child);
+ }
+ throw e;
}
- throw e;
+ ServiceProblemsLogic.clearProblemsIndicator(this,
"children");
+ buckets.put(name, bucket);
}
- ServiceProblemsLogic.clearProblemsIndicator(this,
"children");
- buckets.put(name, bucket);
+ bucket.setMembers(entityMapping.get(name));
}
- bucket.setMembers(entityMapping.get(name));
- }
- // Remove any now-empty buckets
- Set<String> empty =
ImmutableSet.copyOf(Sets.difference(buckets.keySet(), entityMapping.keySet()));
- for (String name : empty) {
- Group removed = buckets.remove(name);
- LOG.debug(this+" removing empty child-bucket "+name+" ->
"+removed);
- removeChild(removed);
- Entities.unmanage(removed);
- }
+ // Remove any now-empty buckets
+ Set<String> empty =
ImmutableSet.copyOf(Sets.difference(buckets.keySet(), entityMapping.keySet()));
+ for (String name : empty) {
+ Group removed = buckets.remove(name);
+ LOG.debug(this + " removing empty child-bucket " + name +
" -> " + removed);
+ removeChild(removed);
+ Entities.unmanage(removed);
+ }
- // Save the bucket mappings
- sensors().set(BUCKETS, ImmutableMap.copyOf(buckets));
+ // Save the bucket mappings
+ sensors().set(BUCKETS, ImmutableMap.copyOf(buckets));
+ }
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ if (Entities.isUnmanagingOrNoLongerManaged(this)) {
+ LOG.debug("Error in "+this+" when unmanaged, ignoring: "+e);
+ } else {
+ throw Exceptions.propagate(e);
+ }
}
}
+ private static class WorkflowFunction implements Function<Entity, String> {
+ private final CustomWorkflowStep workflow;
+
+ public WorkflowFunction(CustomWorkflowStep bucketFunctionW) {
+ this.workflow = bucketFunctionW;
+ }
+
+ public String apply(Entity entity) {
+ Maybe<Task<Object>> t = workflow.newWorkflowExecution(entity,
"Workflow to find bucket name for " + entity, null).getTask(true);
+ if (t.isAbsent()) {
+ LOG.debug("Entity " + entity + " does not match condition to
placed in the dynamic multigroup");
+ return null;
+ }
+ try {
+ return TypeCoercions.coerce(DynamicTasks.submit(t.get(),
entity).getUnchecked(), String.class);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ LOG.warn("Entity " + entity + " failed when trying to evaluate
the bucket it goes in; not putting into a bucket");
+ return null;
+ }
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersRebindTest.java
b/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersRebindTest.java
new file mode 100644
index 0000000000..60328ea911
--- /dev/null
+++
b/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersRebindTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.sensor.StaticSensor;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.entity.group.DynamicMultiGroup;
+import org.apache.brooklyn.entity.stock.BasicEntity;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import static com.google.common.base.Predicates.instanceOf;
+import static org.apache.brooklyn.entity.group.DynamicGroup.ENTITY_FILTER;
+import static
org.apache.brooklyn.entity.group.DynamicMultiGroupImpl.bucketFromAttribute;
+
+public class PropagateToMembersRebindTest extends RebindTestFixtureWithApp {
+
+ private static final AttributeSensor<String> SENSOR =
Sensors.newSensor(String.class, "multigroup.test");
+
+ /**
+ * Tests {@link PropagateToMembers} to set initial value of the propagated
sensor to an existing member.
+ */
+ @Test
+ public void testPropagateToMembers_PropagateInitialValue() throws
Exception {
+
+ final String sensorInitialValue = "Initial value";
+
+ // Prepare sensor for Enricher to propagate to members.
+ final AttributeSensor<String> sensorToPropagate =
Sensors.newStringSensor("sensor-being-modified");
+
+ // Create member entity to propagate attribute to.
+ final BasicEntity memberEntity =
app().createAndManageChild(EntitySpec.create(BasicEntity.class));
+
+ // Create group entity, configure enricher and set member to test
propagation at.
+ final BasicGroup groupEntity =
app().createAndManageChild(EntitySpec.create(BasicGroup.class)
+ .members(ImmutableList.of(memberEntity))
+ .addInitializer(new
StaticSensor<String>(ConfigBag.newInstance(ImmutableMap.of(
+ StaticSensor.SENSOR_NAME, sensorToPropagate.getName(),
+ StaticSensor.STATIC_VALUE, sensorInitialValue))))
+ .enricher(EnricherSpec.create(PropagateToMembers.class)
+ .configure(PropagateToMembers.PROPAGATING,
ImmutableList.of(sensorToPropagate))));
+
+ // Verify that entity and member sensor have expected initial value.
+ EntityAsserts.assertAttributeEqualsEventually(groupEntity,
sensorToPropagate, sensorInitialValue);
+ EntityAsserts.assertAttributeEqualsEventually(memberEntity,
sensorToPropagate, sensorInitialValue);
+
+ rebind();
+ }
+
+ /**
+ * Tests {@link PropagateToMembers} to set initial value of the propagated
sensor to an existing member.
+ */
+ @Test
+ public void testPropagateToMembers_DynamicMultiGroup() throws Exception {
+ TestApplication app = app();
+
+ final String sensorInitialValue = "Initial value";
+
+ // Prepare sensor for Enricher to propagate to members.
+ final AttributeSensor<String> sensorToPropagate =
Sensors.newStringSensor("sensor-being-modified");
+
+ Group group =
app.createAndManageChild(EntitySpec.create(BasicGroup.class));
+ final DynamicMultiGroup dmg = app.createAndManageChild(
+ EntitySpec.create(DynamicMultiGroup.class)
+ .configure(ENTITY_FILTER, instanceOf(TestEntity.class))
+ .configure(DynamicMultiGroup.BUCKET_EXPRESSION,
"${entity.sensor['"+SENSOR.getName()+"']}")
+ .configure(DynamicMultiGroup.BUCKET_SPEC,
EntitySpec.create(BasicGroup.class)
+
.enricher(EnricherSpec.create(PropagateToMembers.class)
+
.configure(PropagateToMembers.PROPAGATING,
ImmutableList.of(sensorToPropagate))) )
+ .addInitializer(new
StaticSensor<String>(ConfigBag.newInstance(ImmutableMap.of(
+ StaticSensor.SENSOR_NAME,
sensorToPropagate.getName(),
+ StaticSensor.STATIC_VALUE,
sensorInitialValue))))
+ .enricher(EnricherSpec.create(PropagateToMembers.class)
+ .configure(PropagateToMembers.PROPAGATING,
ImmutableList.of(sensorToPropagate)))
+ );
+ app.subscriptions().subscribeToChildren(group, SENSOR, new
SensorEventListener<String>() {
+ @Override
+ public void onEvent(SensorEvent<String> event) {
dmg.rescanEntities(); }
+ });
+
+ EntitySpec<TestEntity> childSpec = EntitySpec.create(TestEntity.class);
+ TestEntity child1 =
group.addChild(EntitySpec.create(childSpec).displayName("child1"));
+ TestEntity child2 =
group.addChild(EntitySpec.create(childSpec).displayName("child2"));
+
+ EntityAsserts.assertAttributeEqualsEventually(child1,
sensorToPropagate, sensorInitialValue);
+
+ rebind();
+ }
+
+}
diff --git
a/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersTest.java
b/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersTest.java
index cb11b5bd04..edc531ee0a 100644
---
a/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersTest.java
+++
b/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersTest.java
@@ -95,7 +95,7 @@ public class PropagateToMembersTest extends
BrooklynAppUnitTestSupport {
// Create group entity, configure enricher and set member to test
propagation at.
final BasicGroup groupEntity =
app.createAndManageChild(EntitySpec.create(BasicGroup.class)
.addInitializer(new
StaticSensor<String>(ConfigBag.newInstance(ImmutableMap.of(
- StaticSensor.SENSOR_NAME, "sensor-being-modified",
+ StaticSensor.SENSOR_NAME, sensorToPropagate.getName(),
StaticSensor.STATIC_VALUE, sensorInitialValue))))
.enricher(EnricherSpec.create(PropagateToMembers.class)
.configure(PropagateToMembers.PROPAGATING,
ImmutableList.of(sensorToPropagate))));
diff --git
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupRebindTest.java
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupRebindTest.java
index cf37e24635..67329dd7d2 100644
---
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupRebindTest.java
+++
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupRebindTest.java
@@ -24,11 +24,18 @@ import com.google.common.io.Files;
import org.apache.brooklyn.api.mgmt.ha.MementoCopyMode;
import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData;
import static
org.apache.brooklyn.core.entity.EntityPredicates.displayNameEqualTo;
+
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
import org.apache.brooklyn.core.test.entity.TestApplication;
+
+import static
org.apache.brooklyn.entity.group.DynamicMultiGroup.BUCKET_EXPRESSION;
import static
org.apache.brooklyn.entity.group.DynamicMultiGroup.BUCKET_FUNCTION;
import static
org.apache.brooklyn.entity.group.DynamicMultiGroupImpl.bucketFromAttribute;
+
+import org.apache.brooklyn.core.workflow.WorkflowBasicTest;
import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.core.config.ConfigBag;
import org.testng.Assert;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -55,7 +62,14 @@ public class DynamicMultiGroupRebindTest extends
RebindTestFixtureWithApp {
private static final AttributeSensor<String> SENSOR =
Sensors.newSensor(String.class, "multigroup.test");
- // Previously there was a bug on rebind. The entity's rebind would
immediately connec the
+ @Override
+ protected LocalManagementContext
decorateOrigOrNewManagementContext(LocalManagementContext mgmt) {
+ LocalManagementContext result =
super.decorateOrigOrNewManagementContext(mgmt);
+ WorkflowBasicTest.addWorkflowStepTypes(result);
+ return result;
+ }
+
+ // Previously there was a bug on rebind. The entity's rebind would
immediately connec the
// rescan, which would start executing in another thread. If there were
any empty buckets
// (i.e. groups) that child would be removed. But the rebind-manager would
still be executing
// concurrently. The empty group that was being removed might not have
been reconstituted yet.
@@ -66,6 +80,15 @@ public class DynamicMultiGroupRebindTest extends
RebindTestFixtureWithApp {
// of the entities will be interleaved.
@Test(groups="Integration", invocationCount=10)
public void testRebindWhenGroupDisappeared() throws Exception {
+ doTestRebindWhenGroupDisappeared(
ConfigBag.newInstance().configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
);
+ }
+
+ @Test(groups="WIP", invocationCount=10) // TODO workflows don't run when
shutting down so if entities removed while shutting down, this can still leak
+ public void testRebindWhenGroupDisappearedUsingExpression() throws
Exception {
+ doTestRebindWhenGroupDisappeared(
ConfigBag.newInstance().configure(BUCKET_EXPRESSION,
"${entity.sensor['"+SENSOR.getName()+"']}") );
+ }
+
+ public void doTestRebindWhenGroupDisappeared(ConfigBag config) throws
Exception {
int NUM_ITERATIONS = 10;
List<DynamicMultiGroup> dmgs = Lists.newArrayList();
List<TestEntity> childs = Lists.newArrayList();
@@ -73,12 +96,13 @@ public class DynamicMultiGroupRebindTest extends
RebindTestFixtureWithApp {
// Create lots of DynamicMultiGroups - one entity for each
for (int i = 0; i < NUM_ITERATIONS; i++) {
Group group =
origApp.createAndManageChild(EntitySpec.create(BasicGroup.class));
- DynamicMultiGroup dmg =
origApp.createAndManageChild(EntitySpec.create(DynamicMultiGroup.class)
- .displayName("dmg"+i)
- .configure(DynamicMultiGroup.ENTITY_FILTER,
Predicates.and(EntityPredicates.displayNameEqualTo("child"+i),
instanceOf(TestEntity.class)))
+ EntitySpec<DynamicMultiGroup> spec =
EntitySpec.create(DynamicMultiGroup.class)
+ .displayName("dmg" + i)
+ .configure(DynamicMultiGroup.ENTITY_FILTER,
Predicates.and(displayNameEqualTo("child" + i), instanceOf(TestEntity.class)))
.configure(DynamicMultiGroup.RESCAN_INTERVAL, 5L)
- .configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
- .configure(DynamicMultiGroup.BUCKET_SPEC,
EntitySpec.create(BasicGroup.class)));
+ .configure(DynamicMultiGroup.BUCKET_SPEC,
EntitySpec.create(BasicGroup.class));
+ spec.configure(config.getAllConfig());
+ DynamicMultiGroup dmg = origApp.createAndManageChild(spec);
dmgs.add(dmg);
TestEntity child =
group.addChild(EntitySpec.create(TestEntity.class).displayName("child"+i));
@@ -129,7 +153,10 @@ public class DynamicMultiGroupRebindTest extends
RebindTestFixtureWithApp {
public void testSimplestMultiGroupRebindAndDelete() throws Exception {
DynamicMultiGroup dmg =
origApp.createAndManageChild(EntitySpec.create(DynamicMultiGroup.class)
.configure(DynamicMultiGroup.ENTITY_FILTER,
Predicates.alwaysFalse())
- .configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
+
+// .configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
+ .configure(BUCKET_EXPRESSION,
"${entity.sensor['"+SENSOR.getName()+"']}")
+
.configure(DynamicMultiGroup.BUCKET_SPEC,
EntitySpec.create(BasicGroup.class)));
BrooklynMementoRawData state;
diff --git
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
index dd6e30eea9..6fd24805d7 100644
---
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
+++
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
@@ -54,20 +54,24 @@ public class DynamicMultiGroupTest extends
BrooklynAppUnitTestSupport {
@Test
public void testBucketDistributionFromSubscription() {
-
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_FUNCTION,
bucketFromAttribute(SENSOR)));
+ doTestBucketDistributionFromSubscription(ConfigBag.newInstance()
+ .configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
+ .configure(DynamicMultiGroup.BUCKET_ID_FUNCTION,
bucketFromAttribute(SENSOR)));
}
@Test
public void testBucketDistributionFromSubscriptionWithWorkflow() {
WorkflowBasicTest.addWorkflowStepTypes(mgmt);
-
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_WORKFLOW,
- TypeCoercions.coerce(MutableMap.of("steps",
MutableList.of("let x = ${entity.sensor['"+SENSOR.getName()+"']} ?? \"\"",
"return ${x}")), CustomWorkflowStep.class) ));
+ doTestBucketDistributionFromSubscription(ConfigBag.newInstance()
+ .configure(BUCKET_ID_WORKFLOW,
+ TypeCoercions.coerce(MutableMap.of("steps",
MutableList.of("let x = ${entity.sensor['"+SENSOR.getName()+"']} ?? \"\"",
"return ${x}")), CustomWorkflowStep.class) ));
}
@Test
public void testBucketDistributionFromSubscriptionWithWorkflowExpression()
{
WorkflowBasicTest.addWorkflowStepTypes(mgmt);
-
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_EXPRESSION,
"${entity.sensor['"+SENSOR.getName()+"']}"));
+ doTestBucketDistributionFromSubscription(ConfigBag.newInstance()
+ .configure(BUCKET_ID_EXPRESSION,
"${entity.sensor['"+SENSOR.getName()+"']}"));
}
void doTestBucketDistributionFromSubscription(ConfigBag config) {
@@ -75,7 +79,6 @@ public class DynamicMultiGroupTest extends
BrooklynAppUnitTestSupport {
final DynamicMultiGroup dmg = app.createAndManageChild(
EntitySpec.create(DynamicMultiGroup.class)
.configure(ENTITY_FILTER, instanceOf(TestEntity.class))
- .configure(DynamicMultiGroup.BUCKET_ID_FUNCTION,
bucketFromAttribute(SENSOR))
.configure(config.getAllConfig())
);
app.subscriptions().subscribeToChildren(group, SENSOR, new
SensorEventListener<String>() {