This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git


The following commit(s) were added to refs/heads/master by this push:
     new c2d1ea600c fix some persistence issues with Dynamic MultiGroup and 
PropagateToMembers
c2d1ea600c is described below

commit c2d1ea600c1c7758e03f8499d85c22890fb479ff
Author: Alex Heneveld <[email protected]>
AuthorDate: Fri Dec 2 13:13:21 2022 +0000

    fix some persistence issues with Dynamic MultiGroup and PropagateToMembers
---
 .../core/mgmt/internal/LocalManagementContext.java |   4 +-
 .../core/objs/proxy/InternalEntityFactory.java     |   3 +-
 .../enricher/stock/PropagateToMembers.java         |  29 ++-
 .../brooklyn/entity/group/DynamicMultiGroup.java   |  14 +-
 .../entity/group/DynamicMultiGroupImpl.java        | 217 ++++++++++++---------
 .../stock/PropagateToMembersRebindTest.java        | 121 ++++++++++++
 .../enricher/stock/PropagateToMembersTest.java     |   2 +-
 .../entity/group/DynamicMultiGroupRebindTest.java  |  41 +++-
 .../entity/group/DynamicMultiGroupTest.java        |  13 +-
 9 files changed, 323 insertions(+), 121 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
index 01d45bfb90..200bde3580 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java
@@ -246,7 +246,9 @@ public class LocalManagementContext extends 
AbstractManagementContext {
 
     @Override
     public synchronized LocalEntityManager getEntityManager() {
-        if (!isRunning()) throw new IllegalStateException("Management context 
no longer running");
+        if (!isRunning()) {
+            throw new IllegalStateException("Management context no longer 
running");
+        }
 
         if (entityManager == null) {
             entityManager = new LocalEntityManager(this);
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
 
b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
index 7c2e0721bc..393c99cfb8 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
@@ -201,7 +201,8 @@ public class InternalEntityFactory extends InternalFactory {
                 Exceptions.propagateIfFatal(e);
                 log.info("Failed to initialise entity " + entity + " and its 
descendants - unmanaging and propagating original exception: " + 
Exceptions.collapseText(e));
                 try {
-                    ((EntityManagerInternal) 
managementContext.getEntityManager()).discardPremanaged(entity);
+                    if (managementContext.isRunning())
+                        ((EntityManagerInternal) 
managementContext.getEntityManager()).discardPremanaged(entity);
                 } catch (Exception e2) {
                     Exceptions.propagateIfFatal(e2);
                     log.info("Failed to unmanage entity " + entity + " and its 
descendants, after failure to initialise (rethrowing original exception)", e2);
diff --git 
a/core/src/main/java/org/apache/brooklyn/enricher/stock/PropagateToMembers.java 
b/core/src/main/java/org/apache/brooklyn/enricher/stock/PropagateToMembers.java
index 31df91134c..2b9f325e85 100644
--- 
a/core/src/main/java/org/apache/brooklyn/enricher/stock/PropagateToMembers.java
+++ 
b/core/src/main/java/org/apache/brooklyn/enricher/stock/PropagateToMembers.java
@@ -21,6 +21,7 @@ package org.apache.brooklyn.enricher.stock;
 import com.google.common.base.Predicates;
 import com.google.common.reflect.TypeToken;
 import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.entity.Group;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -31,6 +32,7 @@ import org.apache.brooklyn.config.ConfigKey;
 import org.apache.brooklyn.core.config.ConfigKeys;
 import org.apache.brooklyn.core.enricher.AbstractEnricher;
 import org.apache.brooklyn.core.entity.trait.Changeable;
+import org.apache.brooklyn.util.collections.CollectionFunctionals;
 import org.apache.brooklyn.util.core.flags.SetFromFlag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +53,7 @@ public class PropagateToMembers extends AbstractEnricher 
implements SensorEventL
     public static final ConfigKey<Collection<? extends Sensor<?>>> PROPAGATING 
= ConfigKeys.builder(new TypeToken<Collection<? extends Sensor<?>>>() {})
             .name("enricher.propagating.inclusions")
             .description("Collection of sensors to propagate to members")
-            .constraint(Predicates.and(Objects::nonNull, sensors -> 
!sensors.isEmpty()))
+            .constraint(Predicates.and(Predicates.notNull(), 
CollectionFunctionals.notEmpty()))
             .build();
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -63,19 +65,30 @@ public class PropagateToMembers extends AbstractEnricher 
implements SensorEventL
             throw new IllegalStateException("Only valid for groups");
         }
 
-        subscriptions().subscribe(entity, Changeable.MEMBER_ADDED, event -> {
-            LOG.debug("Propagating '{}' to a new member '{}' ", 
getConfig(PROPAGATING), event.getValue());
-            getConfig(PROPAGATING).forEach(sensor -> {
+        subscriptions().subscribe(entity, Changeable.MEMBER_ADDED, new 
PropagationSubscriber(this));
+
+        getConfig(PROPAGATING).forEach(sensor ->  
subscriptions().subscribe(entity, sensor, this));
+    }
+
+    static class PropagationSubscriber implements SensorEventListener<Entity> {
+        private final PropagateToMembers adjunct;
+
+        public PropagationSubscriber(PropagateToMembers adjunct) {
+            this.adjunct = adjunct;
+        }
+
+        @Override
+        public void onEvent(SensorEvent<Entity> event) {
+            LOG.debug("Propagating '{}' to a new member '{}' ", 
adjunct.getConfig(PROPAGATING), event.getValue());
+            adjunct.getConfig(PROPAGATING).forEach(sensor -> {
                 AttributeSensor attributeSensor = (AttributeSensor<?>)sensor;
-                Object value = entity.getAttribute(attributeSensor);
+                Object value = adjunct.entity.getAttribute(attributeSensor);
                 if (!Objects.isNull(value)) {
                     LOG.debug("Propagating initial {}: {}", attributeSensor, 
value);
                     event.getValue().sensors().set(attributeSensor, value);
                 }
             });
-        });
-
-        getConfig(PROPAGATING).forEach(sensor ->  
subscriptions().subscribe(entity, sensor, this));
+        }
     }
 
     @Override
diff --git 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
index 4410ac184a..fa77163f92 100644
--- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
+++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroup.java
@@ -52,14 +52,12 @@ public interface DynamicMultiGroup extends DynamicGroup {
             "brooklyn.multigroup.bucketFunction",
             "Function to return the bucket (name) an entity should be placed 
in"
     );
-
     @SetFromFlag("bucketWorkflow")
     ConfigKey<CustomWorkflowStep> BUCKET_WORKFLOW = ConfigKeys.newConfigKey(
             CustomWorkflowStep.class,
             "brooklyn.multigroup.bucketWorkflow",
             "Workflow to return the bucket (name) an entity should be placed 
in"
     );
-
     @SetFromFlag("bucketExpression")
     ConfigKey<String> BUCKET_EXPRESSION = ConfigKeys.newConfigKey(
             String.class,
@@ -71,10 +69,18 @@ public interface DynamicMultiGroup extends DynamicGroup {
     ConfigKey<Function<Entity, String>> BUCKET_ID_FUNCTION = 
ConfigKeys.newConfigKey(
             new TypeToken<Function<Entity, String>>(){},
             "brooklyn.multigroup.bucketIdFunction",
-            "Implements the mapping from entity to bucket ID; if supplied, the 
ids of entities E1 and E2 should be the same if and only if the name returned 
by bucketFunction are the same; " +
-                    "if not supplied, no ID is set"
+            "Used at bucket creation time to generate an ID for the bucket. 
Should be unique if and only if the bucket (name) function is unique for two 
entities. " +
+                    "If not supplied, no ID is set."
     );
 
+    ConfigKey<CustomWorkflowStep> BUCKET_ID_WORKFLOW = ConfigKeys.newConfigKey(
+            CustomWorkflowStep.class,
+            "brooklyn.multigroup.bucketIdWorkflow");
+
+    ConfigKey<String> BUCKET_ID_EXPRESSION = ConfigKeys.newConfigKey(
+            String.class,
+            "brooklyn.multigroup.bucketIdExpression");
+
     /**
      * Determines the type of {@link Group} used for the buckets.
      *
diff --git 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
index 7a4e7d674e..82537ddf03 100644
--- 
a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
+++ 
b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicMultiGroupImpl.java
@@ -18,14 +18,9 @@
  */
 package org.apache.brooklyn.entity.group;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.*;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.entity.Group;
@@ -50,15 +45,12 @@ import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 public class DynamicMultiGroupImpl extends DynamicGroupImpl implements 
DynamicMultiGroup {
 
@@ -230,95 +222,132 @@ public class DynamicMultiGroupImpl extends 
DynamicGroupImpl implements DynamicMu
 
     @Override
     public void distributeEntities() {
-        synchronized (memberChangeMutex) {
-            if (Entities.isNoLongerManaged(this)) return;
-
-            Function<Entity, String> bucketFunctionF = 
getConfig(BUCKET_FUNCTION);
-            CustomWorkflowStep bucketFunctionW = getConfig(BUCKET_WORKFLOW);
-            String bucketFunctionE = getConfig(BUCKET_EXPRESSION);
-            if (bucketFunctionF == null && bucketFunctionW == null && 
bucketFunctionE == null) return;
-
-            if (bucketFunctionE != null) {
-                if (bucketFunctionW != null) LOG.warn("Ignoring bucket 
workflow because expression supplied");
-                bucketFunctionW = TypeCoercions.coerce(MutableMap.of("steps", 
MutableList.of("return "+bucketFunctionE)), CustomWorkflowStep.class);
-            }
-            if (bucketFunctionW != null) {
-                if (bucketFunctionF != null) LOG.warn("Ignoring bucket 
function because workflow or expression supplied");
-                CustomWorkflowStep bucketFunctionW2 = bucketFunctionW;
-                bucketFunctionF = entity -> {
-                    Maybe<Task<Object>> t = 
bucketFunctionW2.newWorkflowExecution(entity, "Workflow to find bucket name for 
" + entity, null).getTask(true);
-                    if (t.isAbsent()) {
-                        LOG.debug("Entity " + entity + " does not match 
condition to placed in the dynamic multigroup");
-                        return null;
-                    }
-                    try {
-                        return 
TypeCoercions.coerce(DynamicTasks.submit(t.get(), entity).getUnchecked(), 
String.class);
-                    } catch (Exception e) {
-                        Exceptions.propagateIfFatal(e);
-                        LOG.warn("Entity " + entity + " failed when trying to 
evaluate the bucket it goes in; not putting into a bucket");
-                        return null;
-                    }
-                };
-            }
-            if (bucketFunctionF == null) {
-                LOG.warn(this+" should have exactly one of: a bucket 
expression, workflow, or function");
-                return;
-            }
+        try {
+            synchronized (memberChangeMutex) {
+                if (Entities.isUnmanagingOrNoLongerManaged(this)) return;
 
-            Function<Entity, String> bucketIdFunction = 
getConfig(BUCKET_ID_FUNCTION);
+                Function<Entity, String> bucketFunctionF = 
getConfig(BUCKET_FUNCTION);
+                CustomWorkflowStep bucketFunctionW = 
getConfig(BUCKET_WORKFLOW);
+                String bucketFunctionE = getConfig(BUCKET_EXPRESSION);
 
-            EntitySpec<? extends BasicGroup> bucketSpec = 
getConfig(BUCKET_SPEC);
-            if (bucketSpec == null) return;
+                if (bucketFunctionE != null) {
+                    if (bucketFunctionW != null) LOG.warn("Ignoring bucket 
workflow because expression supplied");
+                    bucketFunctionW = 
TypeCoercions.coerce(MutableMap.of("steps", MutableList.of("return " + 
bucketFunctionE)), CustomWorkflowStep.class);
+                }
+                if (bucketFunctionW != null) {
+                    if (bucketFunctionF != null)
+                        LOG.warn("Ignoring bucket function because workflow or 
expression supplied");
+                    bucketFunctionF = new WorkflowFunction(bucketFunctionW);
+                }
 
-            Map<String, BasicGroup> buckets = 
MutableMap.copyOf(getAttribute(BUCKETS));
+                Function<Entity, String> bucketIdFunctionF = 
getConfig(BUCKET_ID_FUNCTION);
+                CustomWorkflowStep bucketIdFunctionW = 
getConfig(BUCKET_ID_WORKFLOW);
+                String bucketIdFunctionE = getConfig(BUCKET_ID_EXPRESSION);
 
-            // Bucketize the members where the function gives a non-null bucket
-            Function<Entity, String> bucketFunctionF2 = bucketFunctionF;
-            Multimap<String, Entity> entityMapping = 
getMembers().stream().collect(() -> Multimaps.newSetMultimap(MutableMap.of(), 
MutableSet::new),
-                    (map,entity) -> { String name = 
bucketFunctionF2.apply(entity); if (Strings.isNonBlank(name)) map.put(name, 
entity); },
-                    (m1,m2) -> m1.putAll(m2));
+                if (bucketIdFunctionE != null) {
+                    if (bucketIdFunctionW != null) LOG.warn("Ignoring bucket 
workflow because expression supplied");
+                    bucketIdFunctionW = 
TypeCoercions.coerce(MutableMap.of("steps", MutableList.of("return " + 
bucketIdFunctionE)), CustomWorkflowStep.class);
+                }
+                if (bucketIdFunctionW != null) {
+                    if (bucketIdFunctionF != null)
+                        LOG.warn("Ignoring bucket function because workflow or 
expression supplied");
+                    bucketIdFunctionF = new 
WorkflowFunction(bucketIdFunctionW);
+                }
 
-            // Now fill the buckets
-            Collection<Entity> oldChildren = getChildren();
-            for (String name : entityMapping.keySet()) {
-                BasicGroup bucket = buckets.get(name);
-                if (bucket == null) {
-                    try {
-                        EntitySpec<? extends BasicGroup> spec = 
EntitySpec.create(bucketSpec).displayName(name);
-                        if (bucketIdFunction!=null) {
-                            spec.configure(BrooklynConfigKeys.PLAN_ID, 
bucketIdFunction.apply(entityMapping.get(name).iterator().next()));
-                        }
+                if (bucketFunctionF == null) {
+                    if (bucketIdFunctionF!=null) {
+                        bucketFunctionF = bucketIdFunctionF;
+                    } else {
+                        LOG.warn(this + " should have exactly one of: a bucket 
expression, workflow, or function (optionally coming from the bucket ID 
function)");
+                        return;
+                    }
+                }
 
-                        bucket = addChild(spec);
-                    } catch (Exception e) {
-                        Exceptions.propagateIfFatal(e);
-                        ServiceProblemsLogic.updateProblemsIndicator(this, 
"children", "Could not add child; removing all new children for now: 
"+Exceptions.collapseText(e));
-                        // if we don't do this, they get added infinitely often
-                        MutableSet<Entity> newChildren = 
MutableSet.copyOf(getChildren());
-                        newChildren.removeAll(oldChildren);
-                        for (Entity child: newChildren) {
-                            removeChild(child);
+                EntitySpec<? extends BasicGroup> bucketSpec = 
getConfig(BUCKET_SPEC);
+                if (bucketSpec == null) return;
+
+                Map<String, BasicGroup> buckets = 
MutableMap.copyOf(getAttribute(BUCKETS));
+
+                // Bucketize the members where the function gives a non-null 
bucket
+                Function<Entity, String> bucketFunctionF2 = bucketFunctionF;
+                Multimap<String, Entity> entityMapping = 
getMembers().stream().collect(() -> Multimaps.newSetMultimap(MutableMap.of(), 
MutableSet::new),
+                        (map, entity) -> {
+                            String name = bucketFunctionF2.apply(entity);
+                            if (Strings.isNonBlank(name)) map.put(name, 
entity);
+                        },
+                        (m1, m2) -> m1.putAll(m2));
+
+                // Now fill the buckets
+                Collection<Entity> oldChildren = getChildren();
+                for (String name : entityMapping.keySet()) {
+                    BasicGroup bucket = buckets.get(name);
+                    if (bucket == null) {
+                        try {
+                            EntitySpec<? extends BasicGroup> spec = 
EntitySpec.create(bucketSpec).displayName(name);
+                            if (bucketIdFunctionF != null) {
+                                spec.configure(BrooklynConfigKeys.PLAN_ID, 
bucketIdFunctionF.apply(entityMapping.get(name).iterator().next()));
+                            }
+
+                            bucket = addChild(spec);
+                        } catch (Exception e) {
+                            Exceptions.propagateIfFatal(e);
+                            ServiceProblemsLogic.updateProblemsIndicator(this, 
"children", "Could not add child; removing all new children for now: " + 
Exceptions.collapseText(e));
+                            // if we don't do this, they get added infinitely 
often
+                            MutableSet<Entity> newChildren = 
MutableSet.copyOf(getChildren());
+                            newChildren.removeAll(oldChildren);
+                            for (Entity child : newChildren) {
+                                removeChild(child);
+                            }
+                            throw e;
                         }
-                        throw e;
+                        ServiceProblemsLogic.clearProblemsIndicator(this, 
"children");
+                        buckets.put(name, bucket);
                     }
-                    ServiceProblemsLogic.clearProblemsIndicator(this, 
"children");
-                    buckets.put(name, bucket);
+                    bucket.setMembers(entityMapping.get(name));
                 }
-                bucket.setMembers(entityMapping.get(name));
-            }
 
-            // Remove any now-empty buckets
-            Set<String> empty = 
ImmutableSet.copyOf(Sets.difference(buckets.keySet(), entityMapping.keySet()));
-            for (String name : empty) {
-                Group removed = buckets.remove(name);
-                LOG.debug(this+" removing empty child-bucket "+name+" -> 
"+removed);
-                removeChild(removed);
-                Entities.unmanage(removed);
-            }
+                // Remove any now-empty buckets
+                Set<String> empty = 
ImmutableSet.copyOf(Sets.difference(buckets.keySet(), entityMapping.keySet()));
+                for (String name : empty) {
+                    Group removed = buckets.remove(name);
+                    LOG.debug(this + " removing empty child-bucket " + name + 
" -> " + removed);
+                    removeChild(removed);
+                    Entities.unmanage(removed);
+                }
 
-            // Save the bucket mappings
-            sensors().set(BUCKETS, ImmutableMap.copyOf(buckets));
+                // Save the bucket mappings
+                sensors().set(BUCKETS, ImmutableMap.copyOf(buckets));
+            }
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            if (Entities.isUnmanagingOrNoLongerManaged(this)) {
+                LOG.debug("Error in "+this+" when unmanaged, ignoring: "+e);
+            } else {
+                throw Exceptions.propagate(e);
+            }
         }
     }
 
+    private static class WorkflowFunction implements Function<Entity, String> {
+        private final CustomWorkflowStep workflow;
+
+        public WorkflowFunction(CustomWorkflowStep bucketFunctionW) {
+            this.workflow = bucketFunctionW;
+        }
+
+        public String apply(Entity entity) {
+            Maybe<Task<Object>> t = workflow.newWorkflowExecution(entity, 
"Workflow to find bucket name for " + entity, null).getTask(true);
+            if (t.isAbsent()) {
+                LOG.debug("Entity " + entity + " does not match condition to 
placed in the dynamic multigroup");
+                return null;
+            }
+            try {
+                return TypeCoercions.coerce(DynamicTasks.submit(t.get(), 
entity).getUnchecked(), String.class);
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                LOG.warn("Entity " + entity + " failed when trying to evaluate 
the bucket it goes in; not putting into a bucket");
+                return null;
+            }
+        }
+    }
 }
diff --git 
a/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersRebindTest.java
 
b/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersRebindTest.java
new file mode 100644
index 0000000000..60328ea911
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersRebindTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.enricher.stock;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.api.sensor.EnricherSpec;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.sensor.StaticSensor;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.core.test.entity.TestApplication;
+import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.entity.group.DynamicMultiGroup;
+import org.apache.brooklyn.entity.stock.BasicEntity;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import static com.google.common.base.Predicates.instanceOf;
+import static org.apache.brooklyn.entity.group.DynamicGroup.ENTITY_FILTER;
+import static 
org.apache.brooklyn.entity.group.DynamicMultiGroupImpl.bucketFromAttribute;
+
+public class PropagateToMembersRebindTest extends RebindTestFixtureWithApp {
+
+    private static final AttributeSensor<String> SENSOR = 
Sensors.newSensor(String.class, "multigroup.test");
+
+    /**
+     * Tests {@link PropagateToMembers} to set initial value of the propagated 
sensor to an existing member.
+     */
+    @Test
+    public void testPropagateToMembers_PropagateInitialValue() throws 
Exception {
+
+        final String sensorInitialValue = "Initial value";
+
+        // Prepare sensor for Enricher to propagate to members.
+        final AttributeSensor<String> sensorToPropagate = 
Sensors.newStringSensor("sensor-being-modified");
+
+        // Create member entity to propagate attribute to.
+        final BasicEntity memberEntity = 
app().createAndManageChild(EntitySpec.create(BasicEntity.class));
+
+        // Create group entity, configure enricher and set member to test 
propagation at.
+        final BasicGroup groupEntity = 
app().createAndManageChild(EntitySpec.create(BasicGroup.class)
+                .members(ImmutableList.of(memberEntity))
+                .addInitializer(new 
StaticSensor<String>(ConfigBag.newInstance(ImmutableMap.of(
+                        StaticSensor.SENSOR_NAME, sensorToPropagate.getName(),
+                        StaticSensor.STATIC_VALUE, sensorInitialValue))))
+                .enricher(EnricherSpec.create(PropagateToMembers.class)
+                        .configure(PropagateToMembers.PROPAGATING, 
ImmutableList.of(sensorToPropagate))));
+
+        // Verify that entity and member sensor have expected initial value.
+        EntityAsserts.assertAttributeEqualsEventually(groupEntity, 
sensorToPropagate, sensorInitialValue);
+        EntityAsserts.assertAttributeEqualsEventually(memberEntity, 
sensorToPropagate, sensorInitialValue);
+
+        rebind();
+    }
+
+    /**
+     * Tests {@link PropagateToMembers} to set initial value of the propagated 
sensor to an existing member.
+     */
+    @Test
+    public void testPropagateToMembers_DynamicMultiGroup() throws Exception {
+        TestApplication app = app();
+
+        final String sensorInitialValue = "Initial value";
+
+        // Prepare sensor for Enricher to propagate to members.
+        final AttributeSensor<String> sensorToPropagate = 
Sensors.newStringSensor("sensor-being-modified");
+
+        Group group = 
app.createAndManageChild(EntitySpec.create(BasicGroup.class));
+        final DynamicMultiGroup dmg = app.createAndManageChild(
+                EntitySpec.create(DynamicMultiGroup.class)
+                        .configure(ENTITY_FILTER, instanceOf(TestEntity.class))
+                        .configure(DynamicMultiGroup.BUCKET_EXPRESSION, 
"${entity.sensor['"+SENSOR.getName()+"']}")
+                        .configure(DynamicMultiGroup.BUCKET_SPEC, 
EntitySpec.create(BasicGroup.class)
+                                
.enricher(EnricherSpec.create(PropagateToMembers.class)
+                                        
.configure(PropagateToMembers.PROPAGATING, 
ImmutableList.of(sensorToPropagate))) )
+                        .addInitializer(new 
StaticSensor<String>(ConfigBag.newInstance(ImmutableMap.of(
+                                StaticSensor.SENSOR_NAME, 
sensorToPropagate.getName(),
+                                StaticSensor.STATIC_VALUE, 
sensorInitialValue))))
+                        .enricher(EnricherSpec.create(PropagateToMembers.class)
+                                .configure(PropagateToMembers.PROPAGATING, 
ImmutableList.of(sensorToPropagate)))
+        );
+        app.subscriptions().subscribeToChildren(group, SENSOR, new 
SensorEventListener<String>() {
+            @Override
+            public void onEvent(SensorEvent<String> event) { 
dmg.rescanEntities(); }
+        });
+
+        EntitySpec<TestEntity> childSpec = EntitySpec.create(TestEntity.class);
+        TestEntity child1 = 
group.addChild(EntitySpec.create(childSpec).displayName("child1"));
+        TestEntity child2 = 
group.addChild(EntitySpec.create(childSpec).displayName("child2"));
+
+        EntityAsserts.assertAttributeEqualsEventually(child1, 
sensorToPropagate, sensorInitialValue);
+
+        rebind();
+    }
+
+}
diff --git 
a/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersTest.java
 
b/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersTest.java
index cb11b5bd04..edc531ee0a 100644
--- 
a/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/enricher/stock/PropagateToMembersTest.java
@@ -95,7 +95,7 @@ public class PropagateToMembersTest extends 
BrooklynAppUnitTestSupport {
         // Create group entity, configure enricher and set member to test 
propagation at.
         final BasicGroup groupEntity = 
app.createAndManageChild(EntitySpec.create(BasicGroup.class)
                 .addInitializer(new 
StaticSensor<String>(ConfigBag.newInstance(ImmutableMap.of(
-                        StaticSensor.SENSOR_NAME, "sensor-being-modified",
+                        StaticSensor.SENSOR_NAME, sensorToPropagate.getName(),
                         StaticSensor.STATIC_VALUE, sensorInitialValue))))
                 .enricher(EnricherSpec.create(PropagateToMembers.class)
                         .configure(PropagateToMembers.PROPAGATING, 
ImmutableList.of(sensorToPropagate))));
diff --git 
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupRebindTest.java
 
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupRebindTest.java
index cf37e24635..67329dd7d2 100644
--- 
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupRebindTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupRebindTest.java
@@ -24,11 +24,18 @@ import com.google.common.io.Files;
 import org.apache.brooklyn.api.mgmt.ha.MementoCopyMode;
 import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData;
 import static 
org.apache.brooklyn.core.entity.EntityPredicates.displayNameEqualTo;
+
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
 import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
 import org.apache.brooklyn.core.test.entity.TestApplication;
+
+import static 
org.apache.brooklyn.entity.group.DynamicMultiGroup.BUCKET_EXPRESSION;
 import static 
org.apache.brooklyn.entity.group.DynamicMultiGroup.BUCKET_FUNCTION;
 import static 
org.apache.brooklyn.entity.group.DynamicMultiGroupImpl.bucketFromAttribute;
+
+import org.apache.brooklyn.core.workflow.WorkflowBasicTest;
 import org.apache.brooklyn.util.collections.MutableSet;
+import org.apache.brooklyn.util.core.config.ConfigBag;
 import org.testng.Assert;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
@@ -55,7 +62,14 @@ public class DynamicMultiGroupRebindTest extends 
RebindTestFixtureWithApp {
 
     private static final AttributeSensor<String> SENSOR = 
Sensors.newSensor(String.class, "multigroup.test");
 
-    // Previously there was a bug on rebind. The entity's rebind would 
immediately connec the 
+    @Override
+    protected LocalManagementContext 
decorateOrigOrNewManagementContext(LocalManagementContext mgmt) {
+        LocalManagementContext result = 
super.decorateOrigOrNewManagementContext(mgmt);
+        WorkflowBasicTest.addWorkflowStepTypes(result);
+        return result;
+    }
+
+    // Previously there was a bug on rebind. The entity's rebind would 
immediately connec the
     // rescan, which would start executing in another thread. If there were 
any empty buckets
     // (i.e. groups) that child would be removed. But the rebind-manager would 
still be executing
     // concurrently. The empty group that was being removed might not have 
been reconstituted yet.
@@ -66,6 +80,15 @@ public class DynamicMultiGroupRebindTest extends 
RebindTestFixtureWithApp {
     // of the entities will be interleaved.
     @Test(groups="Integration", invocationCount=10)
     public void testRebindWhenGroupDisappeared() throws Exception {
+        doTestRebindWhenGroupDisappeared( 
ConfigBag.newInstance().configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR)) 
);
+    }
+
+    @Test(groups="WIP", invocationCount=10)  // TODO workflows don't run when 
shutting down so if entities removed while shutting down, this can still leak
+    public void testRebindWhenGroupDisappearedUsingExpression() throws 
Exception {
+        doTestRebindWhenGroupDisappeared( 
ConfigBag.newInstance().configure(BUCKET_EXPRESSION, 
"${entity.sensor['"+SENSOR.getName()+"']}") );
+    }
+
+    public void doTestRebindWhenGroupDisappeared(ConfigBag config) throws 
Exception {
         int NUM_ITERATIONS = 10;
         List<DynamicMultiGroup> dmgs = Lists.newArrayList();
         List<TestEntity> childs = Lists.newArrayList();
@@ -73,12 +96,13 @@ public class DynamicMultiGroupRebindTest extends 
RebindTestFixtureWithApp {
         // Create lots of DynamicMultiGroups - one entity for each
         for (int i = 0; i < NUM_ITERATIONS; i++) {
             Group group = 
origApp.createAndManageChild(EntitySpec.create(BasicGroup.class));
-            DynamicMultiGroup dmg = 
origApp.createAndManageChild(EntitySpec.create(DynamicMultiGroup.class)
-                    .displayName("dmg"+i)
-                    .configure(DynamicMultiGroup.ENTITY_FILTER, 
Predicates.and(EntityPredicates.displayNameEqualTo("child"+i), 
instanceOf(TestEntity.class)))
+            EntitySpec<DynamicMultiGroup> spec = 
EntitySpec.create(DynamicMultiGroup.class)
+                    .displayName("dmg" + i)
+                    .configure(DynamicMultiGroup.ENTITY_FILTER, 
Predicates.and(displayNameEqualTo("child" + i), instanceOf(TestEntity.class)))
                     .configure(DynamicMultiGroup.RESCAN_INTERVAL, 5L)
-                    .configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
-                    .configure(DynamicMultiGroup.BUCKET_SPEC, 
EntitySpec.create(BasicGroup.class)));
+                    .configure(DynamicMultiGroup.BUCKET_SPEC, 
EntitySpec.create(BasicGroup.class));
+            spec.configure(config.getAllConfig());
+            DynamicMultiGroup dmg = origApp.createAndManageChild(spec);
             dmgs.add(dmg);
             
             TestEntity child = 
group.addChild(EntitySpec.create(TestEntity.class).displayName("child"+i));
@@ -129,7 +153,10 @@ public class DynamicMultiGroupRebindTest extends 
RebindTestFixtureWithApp {
     public void testSimplestMultiGroupRebindAndDelete() throws Exception {
         DynamicMultiGroup dmg = 
origApp.createAndManageChild(EntitySpec.create(DynamicMultiGroup.class)
                 .configure(DynamicMultiGroup.ENTITY_FILTER, 
Predicates.alwaysFalse())
-                .configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
+
+//                .configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
+                .configure(BUCKET_EXPRESSION, 
"${entity.sensor['"+SENSOR.getName()+"']}")
+
                 .configure(DynamicMultiGroup.BUCKET_SPEC, 
EntitySpec.create(BasicGroup.class)));
 
         BrooklynMementoRawData state;
diff --git 
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
 
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
index dd6e30eea9..6fd24805d7 100644
--- 
a/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/entity/group/DynamicMultiGroupTest.java
@@ -54,20 +54,24 @@ public class DynamicMultiGroupTest extends 
BrooklynAppUnitTestSupport {
 
     @Test
     public void testBucketDistributionFromSubscription() {
-        
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_FUNCTION,
 bucketFromAttribute(SENSOR)));
+        doTestBucketDistributionFromSubscription(ConfigBag.newInstance()
+                .configure(BUCKET_FUNCTION, bucketFromAttribute(SENSOR))
+                .configure(DynamicMultiGroup.BUCKET_ID_FUNCTION, 
bucketFromAttribute(SENSOR)));
     }
 
     @Test
     public void testBucketDistributionFromSubscriptionWithWorkflow() {
         WorkflowBasicTest.addWorkflowStepTypes(mgmt);
-        
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_WORKFLOW,
-                TypeCoercions.coerce(MutableMap.of("steps", 
MutableList.of("let x = ${entity.sensor['"+SENSOR.getName()+"']} ?? \"\"", 
"return ${x}")), CustomWorkflowStep.class) ));
+        doTestBucketDistributionFromSubscription(ConfigBag.newInstance()
+                .configure(BUCKET_ID_WORKFLOW,
+                    TypeCoercions.coerce(MutableMap.of("steps", 
MutableList.of("let x = ${entity.sensor['"+SENSOR.getName()+"']} ?? \"\"", 
"return ${x}")), CustomWorkflowStep.class) ));
     }
 
     @Test
     public void testBucketDistributionFromSubscriptionWithWorkflowExpression() 
{
         WorkflowBasicTest.addWorkflowStepTypes(mgmt);
-        
doTestBucketDistributionFromSubscription(ConfigBag.newInstance().configure(BUCKET_EXPRESSION,
 "${entity.sensor['"+SENSOR.getName()+"']}"));
+        doTestBucketDistributionFromSubscription(ConfigBag.newInstance()
+                .configure(BUCKET_ID_EXPRESSION, 
"${entity.sensor['"+SENSOR.getName()+"']}"));
     }
 
     void doTestBucketDistributionFromSubscription(ConfigBag config) {
@@ -75,7 +79,6 @@ public class DynamicMultiGroupTest extends 
BrooklynAppUnitTestSupport {
         final DynamicMultiGroup dmg = app.createAndManageChild(
                 EntitySpec.create(DynamicMultiGroup.class)
                         .configure(ENTITY_FILTER, instanceOf(TestEntity.class))
-                        .configure(DynamicMultiGroup.BUCKET_ID_FUNCTION, 
bucketFromAttribute(SENSOR))
                         .configure(config.getAllConfig())
         );
         app.subscriptions().subscribeToChildren(group, SENSOR, new 
SensorEventListener<String>() {


Reply via email to