add a new "updatingMap" enricher which helps when multiple enrichers contribute to a map, and use it for the new chaining of service_up behaviour
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/a8bff36e Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/a8bff36e Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/a8bff36e Branch: refs/heads/master Commit: a8bff36ec5cbaaa4c4f10d0adfd464ba8d75b8a7 Parents: 45e3035 Author: Alex Heneveld <[email protected]> Authored: Wed Aug 6 17:05:41 2014 -0400 Committer: Alex Heneveld <[email protected]> Committed: Mon Aug 25 09:32:26 2014 +0100 ---------------------------------------------------------------------- .../main/java/brooklyn/enricher/Enrichers.java | 72 +++++++++ .../brooklyn/enricher/basic/UpdatingMap.java | 149 +++++++++++++++++++ .../java/brooklyn/enricher/EnrichersTest.java | 50 ++++++- .../entity/basic/SoftwareProcessImpl.java | 26 +--- 4 files changed, 274 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a8bff36e/core/src/main/java/brooklyn/enricher/Enrichers.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/Enrichers.java b/core/src/main/java/brooklyn/enricher/Enrichers.java index dab423c..6ddc780 100644 --- a/core/src/main/java/brooklyn/enricher/Enrichers.java +++ b/core/src/main/java/brooklyn/enricher/Enrichers.java @@ -30,6 +30,7 @@ import brooklyn.enricher.basic.Aggregator; import brooklyn.enricher.basic.Combiner; import brooklyn.enricher.basic.Propagator; import brooklyn.enricher.basic.Transformer; +import brooklyn.enricher.basic.UpdatingMap; import brooklyn.entity.Entity; import brooklyn.event.AttributeSensor; import brooklyn.event.Sensor; @@ -97,6 +98,12 @@ public class Enrichers { public <S> AggregatorBuilder<S, Object, ?> aggregating(AttributeSensor<S> val) { return new ConcreteAggregatorBuilder<S,Object>(val); } + /** creates an {@link UpdatingMap} enricher: + * {@link UpdatingMapBuilder#from(AttributeSensor)} and {@link UpdatingMapBuilder#computing(Function)} are required + **/ + public <S,TKey,TVal> UpdatingMapBuilder<S, TKey, TVal> updatingMap(AttributeSensor<Map<TKey,TVal>> target) { + return new UpdatingMapBuilder<S, TKey, TVal>(target); + } } @@ -117,6 +124,8 @@ public class Enrichers { public AggregatorBuilder(AttributeSensor<S> aggregating) { this.aggregating = aggregating; } + // TODO change the signature of this to have correct type info as done for UpdatingMapBuilder.from(Sensor) + // (including change *Builder to Abstract*Builder and Concrete*Builder to *Builder, for all other enricher types) @SuppressWarnings({ "unchecked", "rawtypes" }) public B publishing(AttributeSensor<? extends T> val) { this.publishing = (AttributeSensor) checkNotNull(val); @@ -443,6 +452,61 @@ public class Enrichers { } } + public abstract static class AbstractUpdatingMapBuilder<S, TKey, TVal, B extends AbstractUpdatingMapBuilder<S, TKey, TVal, B>> extends Builder<B> { + protected AttributeSensor<Map<TKey,TVal>> targetSensor; + protected AttributeSensor<? extends S> fromSensor; + protected TKey key; + protected Function<S, ? extends TVal> computing; + protected Boolean removingIfResultIsNull; + + public AbstractUpdatingMapBuilder(AttributeSensor<Map<TKey,TVal>> target) { + this.targetSensor = target; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <S2 extends S> UpdatingMapBuilder<S2,TKey,TVal> from(AttributeSensor<S2> fromSensor) { + this.fromSensor = checkNotNull(fromSensor); + return (UpdatingMapBuilder) this; + } + public B computing(Function<S,? extends TVal> val) { + this.computing = checkNotNull(val); + return self(); + } + /** sets an explicit key to use; defaults to using the name of the source sensor specified in {@link #from(AttributeSensor)} */ + public B key(TKey key) { + this.key = key; + return self(); + } + /** sets explicit behaviour for treating <code>null</code> return values; + * default is to remove */ + public B removingIfResultIsNull(boolean val) { + this.removingIfResultIsNull = val; + return self(); + } + public EnricherSpec<?> build() { + return EnricherSpec.create(UpdatingMap.class) + .uniqueTag("updating:"+targetSensor+"<-"+fromSensor) + .configure(MutableMap.builder() + .put(UpdatingMap.TARGET_SENSOR, targetSensor) + .put(UpdatingMap.SOURCE_SENSOR, fromSensor) + .putIfNotNull(UpdatingMap.KEY_IN_TARGET_SENSOR, key) + .put(UpdatingMap.COMPUTING, computing) + .putIfNotNull(UpdatingMap.REMOVING_IF_RESULT_IS_NULL, removingIfResultIsNull) + .build()); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .omitNullValues() + .add("publishing", targetSensor) + .add("fromSensor", fromSensor) + .add("key", key) + .add("computing", computing) + .add("removingIfResultIsNull", removingIfResultIsNull) + .toString(); + } + } + private static class ConcreteInitialBuilder extends InitialBuilder<ConcreteInitialBuilder> { } @@ -482,6 +546,12 @@ public class Enrichers { } } + public static class UpdatingMapBuilder<S, TKey, TVal> extends AbstractUpdatingMapBuilder<S, TKey, TVal, UpdatingMapBuilder<S, TKey, TVal>> { + public UpdatingMapBuilder(AttributeSensor<Map<TKey,TVal>> val) { + super(val); + } + } + protected static <T extends Number> T average(Collection<T> vals, Number defaultValueForUnreportedSensors, Number valueToReportIfNoSensors, TypeToken<T> type) { Double doubleValueToReportIfNoSensors = (valueToReportIfNoSensors == null) ? null : valueToReportIfNoSensors.doubleValue(); int count = count(vals, defaultValueForUnreportedSensors!=null); @@ -529,4 +599,6 @@ public class Enrichers { } return result; } + + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a8bff36e/core/src/main/java/brooklyn/enricher/basic/UpdatingMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/enricher/basic/UpdatingMap.java b/core/src/main/java/brooklyn/enricher/basic/UpdatingMap.java new file mode 100644 index 0000000..f85852c --- /dev/null +++ b/core/src/main/java/brooklyn/enricher/basic/UpdatingMap.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.enricher.basic; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.event.AttributeSensor; +import brooklyn.event.Sensor; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.flags.SetFromFlag; + +import com.google.common.base.Function; +import com.google.common.reflect.TypeToken; + +/** + * Enricher which updates an entry in a sensor map ({@link #TARGET_SENSOR}) + * based on the value of another sensor ({@link #SOURCE_SENSOR}. + * <p> + * The key used defaults to the name of the source sensor but can be specified with {@link #KEY_IN_TARGET_SENSOR}. + * The value placed in the map is the result of applying the function in {@link #COMPUTING} to the sensor value, + * with default behaviour being to remove an entry if <code>null</code> is returned + * but this can be overriden by setting {@link #REMOVING_IF_RESULT_IS_NULL} false. + * {@link Entities#REMOVE} and {@link Entities#UNCHANGED} are also respeced as return values for the computation + * (ignoring generics). + * + * @author alex + * + * @param <S> source sensor type + * @param <TKey> key type in target sensor map + * @param <TVal> value type in target sensor map + */ +@SuppressWarnings("serial") +public class UpdatingMap<S,TKey,TVal> extends AbstractEnricher implements SensorEventListener<S> { + + private static final Logger LOG = LoggerFactory.getLogger(UpdatingMap.class); + + @SetFromFlag("fromSensor") + public static final ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); + @SetFromFlag("targetSensor") + public static final ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + @SetFromFlag("key") + public static final ConfigKey<?> KEY_IN_TARGET_SENSOR = ConfigKeys.newConfigKey(Object.class, "enricher.updatingMap.keyInTargetSensor", + "Key to update in the target sensor map, defaulting to the name of the source sensor"); + @SetFromFlag("computing") + public static final ConfigKey<Function<?, ?>> COMPUTING = ConfigKeys.newConfigKey(new TypeToken<Function<?,?>>() {}, "enricher.updatingMap.computing"); + @SetFromFlag("removingIfResultIsNull") + public static final ConfigKey<Boolean> REMOVING_IF_RESULT_IS_NULL = ConfigKeys.newBooleanConfigKey("enricher.updatingMap.removingIfResultIsNull", + "Whether the key in the target map is removed if the result if the computation is null"); + + protected AttributeSensor<S> sourceSensor; + protected AttributeSensor<Map<TKey,TVal>> targetSensor; + protected TKey key; + protected Function<S,? extends TVal> computing; + protected Boolean removingIfResultIsNull; + + public UpdatingMap() { + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + this.sourceSensor = (AttributeSensor<S>) getRequiredConfig(SOURCE_SENSOR); + this.targetSensor = (AttributeSensor<Map<TKey,TVal>>) getRequiredConfig(TARGET_SENSOR); + this.key = (TKey) getConfig(KEY_IN_TARGET_SENSOR); + this.computing = (Function) getRequiredConfig(COMPUTING); + this.removingIfResultIsNull = getConfig(REMOVING_IF_RESULT_IS_NULL); + + subscribe(entity, sourceSensor, this); + onUpdated(); + } + + @Override + public void onEvent(SensorEvent<S> event) { + onUpdated(); + } + + /** + * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed). + */ + @SuppressWarnings("unchecked") + protected void onUpdated() { + try { + Object v = computing.apply(entity.getAttribute(sourceSensor)); + if (v == null && !Boolean.FALSE.equals(removingIfResultIsNull)) { + v = Entities.REMOVE; + } + if (v == Entities.UNCHANGED) { + // nothing + } else { + // TODO check synchronization + TKey key = this.key; + if (key==null) key = (TKey) sourceSensor.getName(); + + Map<TKey, TVal> map = entity.getAttribute(targetSensor); + + boolean created = (map==null); + if (created) map = MutableMap.of(); + + boolean changed; + if (v == Entities.REMOVE) { + changed = map.containsKey(key); + if (changed) + map.remove(key); + } else { + TVal oldV = map.get(key); + if (oldV==null) + changed = (v!=null || !map.containsKey(key)); + else + changed = !oldV.equals(v); + if (changed) + map.put(key, (TVal)v); + } + if (changed || created) + emit(targetSensor, map); + } + } catch (Throwable t) { + LOG.warn("Error calculating map update for enricher "+this, t); + throw Exceptions.propagate(t); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a8bff36e/core/src/test/java/brooklyn/enricher/EnrichersTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/enricher/EnrichersTest.java b/core/src/test/java/brooklyn/enricher/EnrichersTest.java index 8dc9615..8bfd2bb 100644 --- a/core/src/test/java/brooklyn/enricher/EnrichersTest.java +++ b/core/src/test/java/brooklyn/enricher/EnrichersTest.java @@ -19,6 +19,7 @@ package brooklyn.enricher; import java.util.Collection; +import java.util.Map; import java.util.Set; import org.testng.annotations.BeforeMethod; @@ -33,8 +34,9 @@ import brooklyn.event.SensorEvent; import brooklyn.event.basic.Sensors; import brooklyn.test.EntityTestUtils; import brooklyn.test.entity.TestEntity; +import brooklyn.util.collections.MutableMap; import brooklyn.util.collections.MutableSet; -import brooklyn.util.guava.TypeTokens; +import brooklyn.util.guava.Functionals; import brooklyn.util.text.StringFunctions; import com.google.common.base.Function; @@ -45,6 +47,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.reflect.TypeToken; +@SuppressWarnings("serial") public class EnrichersTest extends BrooklynAppUnitTestSupport { public static final AttributeSensor<Integer> NUM1 = Sensors.newIntegerSensor("test.num1"); @@ -54,6 +57,9 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport { public static final AttributeSensor<String> STR2 = Sensors.newStringSensor("test.str2"); public static final AttributeSensor<Set<Object>> SET1 = Sensors.newSensor(new TypeToken<Set<Object>>() {}, "test.set1", "set1 descr"); public static final AttributeSensor<Long> LONG1 = Sensors.newLongSensor("test.long1"); + public static final AttributeSensor<Map<String,String>> MAP1 = Sensors.newSensor(new TypeToken<Map<String,String>>() {}, "test.map1", "map1 descr"); + @SuppressWarnings("rawtypes") + public static final AttributeSensor<Map> MAP2 = Sensors.newSensor(Map.class, "test.map2"); private TestEntity entity; private TestEntity entity2; @@ -68,6 +74,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport { group = app.createAndManageChild(EntitySpec.create(BasicGroup.class)); } + @SuppressWarnings("unchecked") @Test public void testAdding() { entity.addEnricher(Enrichers.builder() @@ -81,6 +88,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport { EntityTestUtils.assertAttributeEqualsEventually(entity, NUM3, 5); } + @SuppressWarnings("unchecked") @Test public void testCombiningWithCustomFunction() { entity.addEnricher(Enrichers.builder() @@ -94,6 +102,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport { EntityTestUtils.assertAttributeEqualsEventually(entity, NUM3, 1); } + @SuppressWarnings("unchecked") @Test(groups="Integration") // because takes a second public void testCombiningRespectsUnchanged() { entity.addEnricher(Enrichers.builder() @@ -147,7 +156,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport { entity.addEnricher(Enrichers.builder() .transforming(NUM1) .publishing(LONG1) - .computing((Function)Functions.constant(Integer.valueOf(1))) + .computing(Functions.constant(Integer.valueOf(1))) .build()); entity.setAttribute(NUM1, 123); @@ -312,7 +321,7 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport { .aggregating(NUM1) .publishing(LONG1) .fromMembers() - .computing((Function)Functions.constant(Integer.valueOf(1))) + .computing(Functions.constant(Integer.valueOf(1))) .build()); entity.setAttribute(NUM1, 123); @@ -342,4 +351,39 @@ public class EnrichersTest extends BrooklynAppUnitTestSupport { entity.setAttribute(NUM1, 987654); EntityTestUtils.assertAttributeEqualsContinually(group, LONG1, Long.valueOf(123)); } + @Test + public void testUpdatingMap1() { + entity.addEnricher(Enrichers.builder() + .updatingMap(MAP1) + .from(LONG1) + .computing(Functionals.when(-1L).value("-1 is not allowed")) + .build()); + + doUpdatingMapChecks(MAP1); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testUpdatingMap2() { + entity.addEnricher(Enrichers.builder() + .updatingMap((AttributeSensor)MAP2) + .from(LONG1) + .computing(Functionals.when(-1L).value("-1 is not allowed")) + .build()); + + doUpdatingMapChecks(MAP2); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected void doUpdatingMapChecks(AttributeSensor mapSensor) { + EntityTestUtils.assertAttributeEqualsEventually(entity, mapSensor, MutableMap.<String,String>of()); + + entity.setAttribute(LONG1, -1L); + EntityTestUtils.assertAttributeEqualsEventually(entity, mapSensor, MutableMap.<String,String>of( + LONG1.getName(), "-1 is not allowed")); + + entity.setAttribute(LONG1, 1L); + EntityTestUtils.assertAttributeEqualsEventually(entity, mapSensor, MutableMap.<String,String>of()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a8bff36e/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java b/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java index fb872b0..56bb319 100644 --- a/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java +++ b/software/base/src/main/java/brooklyn/entity/basic/SoftwareProcessImpl.java @@ -51,13 +51,13 @@ import brooklyn.util.collections.MutableMap; import brooklyn.util.collections.MutableSet; import brooklyn.util.config.ConfigBag; import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Functionals; import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.Tasks; import brooklyn.util.time.CountdownTimer; import brooklyn.util.time.Duration; import brooklyn.util.time.Time; -import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -163,25 +163,11 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft })) .build(); - // FIXME quick-and-dirty change - Function<Boolean, Map<String,Object>> f = new Function<Boolean, Map<String,Object>>() { - @Override - public Map<String, Object> apply(Boolean input) { - Map<String, Object> result = getAttribute(Attributes.SERVICE_NOT_UP_INDICATORS); - if (result==null) result = MutableMap.of(); - // TODO only change/publish if it needs changing... - if (Boolean.TRUE.equals(input)) { - result.remove(SERVICE_PROCESS_IS_RUNNING.getName()); - return result; - } else { - result.put(SERVICE_PROCESS_IS_RUNNING.getName(), "Process not running (according to driver checkRunning)"); - return result; - } - } - }; - addEnricher(Enrichers.builder().transforming(SERVICE_PROCESS_IS_RUNNING).publishing(Attributes.SERVICE_NOT_UP_INDICATORS) - .computing(f).build()); - + addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(SERVICE_PROCESS_IS_RUNNING) + .computing(Functionals.when(false).value("Process not running (according to driver checkRunning)")) + .build()); + // FIXME lives elsewhere addEnricher(Enrichers.builder().transforming(Attributes.SERVICE_NOT_UP_INDICATORS).publishing(Attributes.SERVICE_UP) .computing( Functions.forPredicate(CollectionFunctionals.<String>mapSizeEquals(0)) ).build());
