Adds reducer enricher
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7e39e466 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7e39e466 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7e39e466 Branch: refs/heads/master Commit: 7e39e46686a4206e69a85756fb1c18f83a7c5b41 Parents: 0c85cd9 Author: Martin Harris <[email protected]> Authored: Tue Sep 22 14:56:02 2015 +0100 Committer: Robert Moss <[email protected]> Committed: Thu Oct 1 16:37:52 2015 +0100 ---------------------------------------------------------------------- .../brooklyn/enricher/stock/Enrichers.java | 52 ++++++++++++ .../apache/brooklyn/enricher/stock/Reducer.java | 89 ++++++++++++++++++++ .../brooklyn/enricher/stock/Transformer.java | 2 + .../brooklyn/enricher/stock/ReducerTest.java | 81 ++++++++++++++++++ 4 files changed, 224 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java index baed4f1..4273dec 100644 --- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java @@ -171,6 +171,9 @@ public class Enrichers { public JoinerBuilder joining(AttributeSensor<?> source) { return new JoinerBuilder(source); } + public ReducerBuilder reducing(List<AttributeSensor<?>> sourceSensors) { + return new ReducerBuilder(sourceSensors); + } } @@ -676,6 +679,49 @@ public class Enrichers { .toString(); } } + + protected abstract static class AbstractReducerBuilder<S, B extends AbstractReducerBuilder<S, B>> extends AbstractEnricherBuilder<B> { + protected AttributeSensor<S> publishing; + protected Entity fromEntity; + protected List<AttributeSensor<?>> reducing; + protected Function<List<AttributeSensor<?>>, String> computing; + + public AbstractReducerBuilder(List<AttributeSensor<?>> val) { + super(Reducer.class); + this.reducing = checkNotNull(val); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <S> ReducerBuilder<S> publishing(AttributeSensor<? extends S> val) { + this.publishing = (AttributeSensor) checkNotNull(val); + return (ReducerBuilder) this; + } + + public B from(Entity val) { + this.fromEntity = checkNotNull(val); + return self(); + } + + public B computing(Function<List<AttributeSensor<?>>, String> val) { + this.computing = checkNotNull(val); + return self(); + } + + public EnricherSpec<?> build() { + return super.build().configure(MutableMap.builder() + .put(Reducer.SOURCE_SENSORS, reducing) + .put(Reducer.PRODUCER, fromEntity) + .put(Reducer.TARGET_SENSOR, publishing) + .put(Reducer.REDUCER_FUNCTION, computing) + .build() + ); + } + + @Override + protected String getDefaultUniqueTag() { + return "reducer:" + reducing.toString(); + } + } public static class InitialBuilder extends AbstractInitialBuilder<InitialBuilder> { } @@ -729,6 +775,12 @@ public class Enrichers { } } + public static class ReducerBuilder<S> extends AbstractReducerBuilder<S, ReducerBuilder<S>> { + public ReducerBuilder(List<AttributeSensor<?>> val) { + super(val); + } + } + @Beta private abstract static class ComputingNumber<T extends Number> implements Function<Collection<T>, T> { protected final Number defaultValueForUnreportedSensors; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java new file mode 100644 index 0000000..e22fcff --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java @@ -0,0 +1,89 @@ +package org.apache.brooklyn.enricher.stock; + +import java.util.List; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.util.core.flags.SetFromFlag; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.core.task.ValueResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.api.client.util.Lists; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.reflect.TypeToken; + +public class Reducer extends AbstractEnricher implements SensorEventListener<Object> { + + private static final Logger LOG = LoggerFactory.getLogger(Reducer.class); + + @SetFromFlag("producer") + public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); + + public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + public static ConfigKey<List<? extends AttributeSensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<? extends AttributeSensor<?>>>() {}, "enricher.sourceSensors"); + public static ConfigKey<Function<List<?>,?>> REDUCER_FUNCTION = ConfigKeys.newConfigKey(new TypeToken<Function<List<?>, ?>>() {}, "enricher.reducerFunction"); + + protected Entity producer; + + protected List<AttributeSensor<?>> subscribedSensors; + protected Sensor<?> targetSensor; + + protected Function<List<?>, ?> reducerFunction; + + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + Preconditions.checkNotNull(getConfig(SOURCE_SENSORS), "source sensors"); + + this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER); + List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList(); + + for (Object sensorO : getConfig(SOURCE_SENSORS)) { + AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get(); + if(!sensorListTemp.contains(sensor)) { + sensorListTemp.add(sensor); + } + } + + reducerFunction = config().get(REDUCER_FUNCTION); + Preconditions.checkState(sensorListTemp.size() > 0, "Nothing to reduce"); + + for (Sensor<?> sensor : sensorListTemp) { + subscribe(producer, sensor, this); + } + + subscribedSensors = ImmutableList.copyOf(sensorListTemp); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void onEvent(SensorEvent<Object> event) { + Sensor<?> destinationSensor = getConfig(TARGET_SENSOR); + + List<Object> values = Lists.newArrayList(); + + for (AttributeSensor<?> sourceSensor : subscribedSensors) { + Object resolvedSensorValue = entity.sensors().get(sourceSensor); + values.add(resolvedSensorValue); + } + + Object result = reducerFunction.apply(values); + + if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {} as {}", + new Object[] {this, event, entity, reducerFunction, destinationSensor}); + + emit((Sensor)destinationSensor, result); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java index ef23ab4..f15b2b2 100644 --- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.core.task.ValueResolver; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7e39e466/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java new file mode 100644 index 0000000..7a350eb --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java @@ -0,0 +1,81 @@ +package org.apache.brooklyn.enricher.stock; + +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.EnricherSpec; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.EntityTestUtils; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; + +public class ReducerTest extends BrooklynAppUnitTestSupport { + + public static final AttributeSensor<String> STR1 = Sensors.newStringSensor("test.str1"); + public static final AttributeSensor<String> STR2 = Sensors.newStringSensor("test.str2"); + public static final AttributeSensor<String> STR3 = Sensors.newStringSensor("test.str3"); + + private TestEntity entity; + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + super.setUp(); + entity = app.createAndManageChild(EntitySpec.create(TestEntity.class)); + } + + @Test + public void testBasicReducer() { + entity.addEnricher(EnricherSpec.create(Reducer.class) + .configure(Reducer.PRODUCER, entity) + .configure(Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2)) + .configure(Reducer.TARGET_SENSOR, STR3) + .configure(Reducer.REDUCER_FUNCTION, new Concatenator()) + ); + entity.sensors().set(STR1, "foo"); + EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null); + + entity.sensors().set(STR2, "bar"); + EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar"); + } + + @Test + public void testReducingBuilder() { + entity.addEnricher(Enrichers.builder().reducing(ImmutableList.<AttributeSensor<?>>of(STR1, STR2)) + .from(entity) + .computing(new Concatenator()) + .publishing(STR3) + .build() + ); + + entity.sensors().set(STR1, "foo"); + EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null); + + entity.sensors().set(STR2, "bar"); + EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar"); + } + + private class Concatenator implements Function<List<?>, String> { + @Nullable + @Override + public String apply(List<?> values) { + String result = ""; + for (Object value : values) { + if (value == null) { + return null; + } else { + result += String.valueOf(value); + } + } + return result; + } + } +}
