Fix aggregating enrichers using producer for members - previously if configured aggregating enricher to use a specific producer and fromMembers or fromChildren, it did not correctly use the members/children of the given producer
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/f1f412bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/f1f412bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/f1f412bd Branch: refs/heads/master Commit: f1f412bd390e2630492cd7b1bc567762e2b88c4a Parents: de79106 Author: Aled Sage <[email protected]> Authored: Wed May 28 18:51:46 2014 +0100 Committer: Aled Sage <[email protected]> Committed: Fri May 30 10:24:38 2014 +0100 ---------------------------------------------------------------------- .../brooklyn/enricher/basic/Aggregator.java | 15 ++- .../enricher/CustomAggregatingEnricherTest.java | 133 ++++++++++++++++++- 2 files changed, 135 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f1f412bd/core/src/main/java/brooklyn/enricher/basic/Aggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/Aggregator.java b/core/src/main/java/brooklyn/enricher/basic/Aggregator.java index 6003660..ba398b4 100644 --- a/core/src/main/java/brooklyn/enricher/basic/Aggregator.java +++ b/core/src/main/java/brooklyn/enricher/basic/Aggregator.java @@ -108,19 +108,22 @@ public class Aggregator<T,U> extends AbstractEnricher implements SensorEventList } if (Boolean.TRUE.equals(fromMembers)) { - subscribe(entity, Changeable.MEMBER_ADDED, new SensorEventListener<Entity>() { + checkState(producer instanceof Group, "must be a group when fromMembers true: producer=%s; entity=%s; " + + "hardcodedProducers=%s", getConfig(PRODUCER), entity, fromHardcodedProducers); + + subscribe(producer, Changeable.MEMBER_ADDED, new SensorEventListener<Entity>() { @Override public void onEvent(SensorEvent<Entity> event) { if (entityFilter.apply(event.getValue())) addProducer(event.getValue()); } }); - subscribe(entity, Changeable.MEMBER_REMOVED, new SensorEventListener<Entity>() { + subscribe(producer, Changeable.MEMBER_REMOVED, new SensorEventListener<Entity>() { @Override public void onEvent(SensorEvent<Entity> event) { removeProducer(event.getValue()); } }); - if (entity instanceof Group) { - for (Entity member : Iterables.filter(((Group)entity).getMembers(), entityFilter)) { + if (producer instanceof Group) { + for (Entity member : Iterables.filter(((Group)producer).getMembers(), entityFilter)) { addProducer(member); } } @@ -131,12 +134,12 @@ public class Aggregator<T,U> extends AbstractEnricher implements SensorEventList if (LOG.isDebugEnabled()) LOG.debug("{} linked (children of {}, {}) to {}", new Object[] {this, producer, sourceSensor, targetSensor}); subscribeToChildren(producer, sourceSensor, this); - subscribe(entity, AbstractEntity.CHILD_REMOVED, new SensorEventListener<Entity>() { + subscribe(producer, AbstractEntity.CHILD_REMOVED, new SensorEventListener<Entity>() { @Override public void onEvent(SensorEvent<Entity> event) { onProducerRemoved(event.getValue()); } }); - subscribe(entity, AbstractEntity.CHILD_ADDED, new SensorEventListener<Entity>() { + subscribe(producer, AbstractEntity.CHILD_ADDED, new SensorEventListener<Entity>() { @Override public void onEvent(SensorEvent<Entity> event) { if (entityFilter.apply(event.getValue())) onProducerAdded(event.getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f1f412bd/core/src/test/java/brooklyn/enricher/CustomAggregatingEnricherTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/enricher/CustomAggregatingEnricherTest.java b/core/src/test/java/brooklyn/enricher/CustomAggregatingEnricherTest.java index 2c01213..4a7303a 100644 --- a/core/src/test/java/brooklyn/enricher/CustomAggregatingEnricherTest.java +++ b/core/src/test/java/brooklyn/enricher/CustomAggregatingEnricherTest.java @@ -15,6 +15,7 @@ import brooklyn.entity.basic.Entities; import brooklyn.entity.proxying.EntitySpec; import brooklyn.event.AttributeSensor; import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.location.LocationSpec; import brooklyn.location.basic.SimulatedLocation; import brooklyn.test.EntityTestUtils; import brooklyn.test.entity.TestApplication; @@ -35,6 +36,7 @@ public class CustomAggregatingEnricherTest { TestApplication app; TestEntity entity; + SimulatedLocation loc; AttributeSensor<Integer> intSensor; AttributeSensor<Double> doubleSensor; @@ -47,8 +49,8 @@ public class CustomAggregatingEnricherTest { intSensor = new BasicAttributeSensor<Integer>(Integer.class, "int sensor"); doubleSensor = new BasicAttributeSensor<Double>(Double.class, "double sensor"); target = new BasicAttributeSensor<Integer>(Integer.class, "target sensor"); - - app.start(ImmutableList.of(new SimulatedLocation())); + loc = app.getManagementContext().getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class)); + app.start(ImmutableList.of(loc)); } @AfterMethod(alwaysRun=true) @@ -337,12 +339,9 @@ public class CustomAggregatingEnricherTest { EntityTestUtils.assertAttributeEqualsEventually(group, target, 1); } - @Test(groups = "Integration") + @Test(groups = "Integration", invocationCount=50) public void testAggregatesGroupMembersFiftyTimes() { - for (int i=0; i<50; i++) { - log.debug("testAggregatesNewMembersOfGroup {}", i); - testAggregatesNewMembersOfGroup(); - } + testAggregatesNewMembersOfGroup(); } @Test @@ -373,6 +372,34 @@ public class CustomAggregatingEnricherTest { } @Test + public void testAggregatesMembersOfProducer() { + BasicGroup group = app.addChild(EntitySpec.create(BasicGroup.class)); + TestEntity p1 = app.getManagementContext().getEntityManager().createEntity(EntitySpec.create(TestEntity.class).parent(group)); + TestEntity p2 = app.getManagementContext().getEntityManager().createEntity(EntitySpec.create(TestEntity.class).parent(group)); + group.addMember(p1); + group.addMember(p2); + p1.setAttribute(intSensor, 1); + Entities.manage(group); + + app.addEnricher(Enrichers.builder() + .aggregating(intSensor) + .publishing(target) + .computingSum() + .from(group) + .fromMembers() + .build()); + + + EntityTestUtils.assertAttributeEqualsEventually(app, target, 1); + + p2.setAttribute(intSensor, 2); + EntityTestUtils.assertAttributeEqualsEventually(app, target, 3); + + group.removeMember(p2); + EntityTestUtils.assertAttributeEqualsEventually(app, target, 1); + } + + @Test public void testAppliesFilterWhenAggregatingMembersOfGroup() { BasicGroup group = app.createAndManageChild(EntitySpec.create(BasicGroup.class)); TestEntity p1 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); @@ -399,6 +426,98 @@ public class CustomAggregatingEnricherTest { } @Test + public void testAggregatesNewChidren() { + entity.addEnricher(Enrichers.builder() + .aggregating(intSensor) + .publishing(target) + .computingSum() + .fromChildren() + .defaultValueForUnreportedSensors(0) + .valueToReportIfNoSensors(0) + .build()); + + EntityTestUtils.assertAttributeEqualsEventually(entity, target, 0); + + TestEntity p1 = entity.createAndManageChild(EntitySpec.create(TestEntity.class)); + p1.setAttribute(intSensor, 1); + EntityTestUtils.assertAttributeEqualsEventually(entity, target, 1); + + TestEntity p2 = entity.createAndManageChild(EntitySpec.create(TestEntity.class)); + p2.setAttribute(intSensor, 2); + EntityTestUtils.assertAttributeEqualsEventually(entity, target, 3); + + Entities.unmanage(p2); + EntityTestUtils.assertAttributeEqualsEventually(entity, target, 1); + } + + @Test + public void testAggregatesExistingChildren() { + TestEntity p1 = entity.createAndManageChild(EntitySpec.create(TestEntity.class)); + TestEntity p2 = entity.createAndManageChild(EntitySpec.create(TestEntity.class)); + p1.setAttribute(intSensor, 1); + + entity.addEnricher(Enrichers.builder() + .aggregating(intSensor) + .publishing(target) + .computingSum() + .fromChildren() + .build()); + + + EntityTestUtils.assertAttributeEqualsEventually(entity, target, 1); + + p2.setAttribute(intSensor, 2); + EntityTestUtils.assertAttributeEqualsEventually(entity, target, 3); + + Entities.unmanage(p2); + EntityTestUtils.assertAttributeEqualsEventually(entity, target, 1); + } + + @Test + public void testAggregatesChildrenOfProducer() { + TestEntity p1 = entity.createAndManageChild(EntitySpec.create(TestEntity.class)); + TestEntity p2 = entity.createAndManageChild(EntitySpec.create(TestEntity.class)); + p1.setAttribute(intSensor, 1); + + app.addEnricher(Enrichers.builder() + .aggregating(intSensor) + .publishing(target) + .computingSum() + .from(entity) + .fromChildren() + .build()); + + + EntityTestUtils.assertAttributeEqualsEventually(app, target, 1); + + p2.setAttribute(intSensor, 2); + EntityTestUtils.assertAttributeEqualsEventually(app, target, 3); + + Entities.unmanage(p2); + EntityTestUtils.assertAttributeEqualsEventually(app, target, 1); + } + + @Test + public void testAppliesFilterWhenAggregatingChildrenOfGroup() { + TestEntity p1 = entity.createAndManageChild(EntitySpec.create(TestEntity.class)); + p1.setAttribute(intSensor, 1); + + entity.addEnricher(Enrichers.builder() + .aggregating(intSensor) + .publishing(target) + .computingSum() + .fromChildren() + .entityFilter(Predicates.equalTo((Entity)p1)) + .build()); + + EntityTestUtils.assertAttributeEqualsEventually(entity, target, 1); + + TestEntity p2 = entity.createAndManageChild(EntitySpec.create(TestEntity.class)); + p2.setAttribute(intSensor, 2); + EntityTestUtils.assertAttributeEqualsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), entity, target, 1); + } + + @Test public void testCustomAggregatingFunction() { TestEntity producer1 = app.createAndManageChild(EntitySpec.create(TestEntity.class)); Function<Collection<Integer>,Integer> aggregator = new Function<Collection<Integer>, Integer>() {
