add generic type reducer and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/f0ca5748 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/f0ca5748 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/f0ca5748 Branch: refs/heads/master Commit: f0ca574819a66cb4799eb794f63929e174e44f95 Parents: 7e39e46 Author: Robert Moss <[email protected]> Authored: Wed Sep 23 12:24:24 2015 +0100 Committer: Robert Moss <[email protected]> Committed: Thu Oct 1 16:37:52 2015 +0100 ---------------------------------------------------------------------- .../brooklyn/enricher/stock/Enrichers.java | 50 ++++-- .../apache/brooklyn/enricher/stock/Reducer.java | 113 ++++++++++--- .../brooklyn/enricher/stock/ReducerTest.java | 163 +++++++++++++++++-- 3 files changed, 274 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java index 4273dec..25d186f 100644 --- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Enrichers.java @@ -48,6 +48,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -171,8 +172,8 @@ public class Enrichers { public JoinerBuilder joining(AttributeSensor<?> source) { return new JoinerBuilder(source); } - public ReducerBuilder reducing(List<AttributeSensor<?>> sourceSensors) { - return new ReducerBuilder(sourceSensors); + public <S, T> ReducerBuilder<S, T> reducing(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> sourceSensors) { + return new ReducerBuilder<S, T>(clazz, sourceSensors); } } @@ -680,21 +681,22 @@ public class Enrichers { } } - protected abstract static class AbstractReducerBuilder<S, B extends AbstractReducerBuilder<S, B>> extends AbstractEnricherBuilder<B> { - protected AttributeSensor<S> publishing; + protected abstract static class AbstractReducerBuilder<S, T, B extends AbstractReducerBuilder<S, T, B>> extends AbstractEnricherBuilder<B> { + protected AttributeSensor<T> publishing; protected Entity fromEntity; - protected List<AttributeSensor<?>> reducing; - protected Function<List<AttributeSensor<?>>, String> computing; + protected List<AttributeSensor<S>> reducing; + protected Function<List<S>, T> computing; + protected String functionName; + private Map<String, Object> parameters; - public AbstractReducerBuilder(List<AttributeSensor<?>> val) { - super(Reducer.class); + public AbstractReducerBuilder(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> val) { + super(clazz); this.reducing = checkNotNull(val); } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public <S> ReducerBuilder<S> publishing(AttributeSensor<? extends S> val) { - this.publishing = (AttributeSensor) checkNotNull(val); - return (ReducerBuilder) this; + + public B publishing(AttributeSensor<T> val) { + this.publishing = checkNotNull(val); + return self(); } public B from(Entity val) { @@ -702,17 +704,29 @@ public class Enrichers { return self(); } - public B computing(Function<List<AttributeSensor<?>>, String> val) { + public B computing(Function<List<S>, T> val) { this.computing = checkNotNull(val); return self(); } + + public B computing(String functionName) { + return computing(functionName, ImmutableMap.<String, Object>of()); + } + + public B computing(String functionName, Map<String, Object> parameters) { + this.functionName = functionName; + this.parameters = parameters; + return self(); + } public EnricherSpec<?> build() { return super.build().configure(MutableMap.builder() .put(Reducer.SOURCE_SENSORS, reducing) .put(Reducer.PRODUCER, fromEntity) .put(Reducer.TARGET_SENSOR, publishing) - .put(Reducer.REDUCER_FUNCTION, computing) + .putIfNotNull(Reducer.REDUCER_FUNCTION, computing) + .putIfNotNull(Reducer.REDUCER_FUNCTION_UNTYPED, functionName) + .putIfNotNull(Reducer.PARAMETERS, parameters) .build() ); } @@ -775,9 +789,9 @@ public class Enrichers { } } - public static class ReducerBuilder<S> extends AbstractReducerBuilder<S, ReducerBuilder<S>> { - public ReducerBuilder(List<AttributeSensor<?>> val) { - super(val); + public static class ReducerBuilder<S, T> extends AbstractReducerBuilder<S, T, ReducerBuilder<S, T>> { + public ReducerBuilder(Class<? extends Reducer<S, T>> clazz, List<AttributeSensor<S>> val) { + super(clazz, val); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java index e22fcff..938cf22 100644 --- a/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Reducer.java @@ -1,6 +1,8 @@ package org.apache.brooklyn.enricher.stock; +import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; @@ -23,59 +25,76 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.reflect.TypeToken; -public class Reducer extends AbstractEnricher implements SensorEventListener<Object> { +@SuppressWarnings("serial") +public abstract class Reducer<S, T> extends AbstractEnricher implements SensorEventListener<Object> { private static final Logger LOG = LoggerFactory.getLogger(Reducer.class); @SetFromFlag("producer") public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); - public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); public static ConfigKey<List<? extends AttributeSensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<? extends AttributeSensor<?>>>() {}, "enricher.sourceSensors"); public static ConfigKey<Function<List<?>,?>> REDUCER_FUNCTION = ConfigKeys.newConfigKey(new TypeToken<Function<List<?>, ?>>() {}, "enricher.reducerFunction"); - + @SetFromFlag("transformation") + public static final ConfigKey<String> REDUCER_FUNCTION_UNTYPED = ConfigKeys.newStringConfigKey("enricher.reducerFunction.untyped", + "A string matching a pre-defined named reducer function, such as join"); + public static final ConfigKey<Map<String, Object>> PARAMETERS = ConfigKeys.newConfigKey(new TypeToken<Map<String, Object>>() {}, "enricher.reducerFunctionParameters", + "A map of parameters to pass into the reducer function"); + protected Entity producer; + protected List<AttributeSensor<S>> subscribedSensors; + protected Sensor<T> targetSensor; + protected Function<List<S>, T> reducerFunction; - protected List<AttributeSensor<?>> subscribedSensors; - protected Sensor<?> targetSensor; - - protected Function<List<?>, ?> reducerFunction; - - + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void setEntity(EntityLocal entity) { super.setEntity(entity); Preconditions.checkNotNull(getConfig(SOURCE_SENSORS), "source sensors"); this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER); - List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList(); + List<AttributeSensor<S>> sensorListTemp = Lists.newArrayList(); for (Object sensorO : getConfig(SOURCE_SENSORS)) { - AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get(); + AttributeSensor<S> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get(); if(!sensorListTemp.contains(sensor)) { sensorListTemp.add(sensor); } } + + String reducerName = config().get(REDUCER_FUNCTION_UNTYPED); + Function<List<S>, T> reducerFunction = (Function) config().get(REDUCER_FUNCTION); + if(reducerFunction == null){ + Map<String, ?> parameters = config().get(PARAMETERS); + reducerFunction = createReducerFunction(reducerName, parameters); + } - reducerFunction = config().get(REDUCER_FUNCTION); + this.reducerFunction = reducerFunction; Preconditions.checkState(sensorListTemp.size() > 0, "Nothing to reduce"); - for (Sensor<?> sensor : sensorListTemp) { + for (Sensor<S> sensor : sensorListTemp) { subscribe(producer, sensor, this); } subscribedSensors = ImmutableList.copyOf(sensorListTemp); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + protected abstract Function<List<S>, T> createReducerFunction(String reducerName, Map<String, ?> parameters); + + @SuppressWarnings("unchecked") @Override public void onEvent(SensorEvent<Object> event) { - Sensor<?> destinationSensor = getConfig(TARGET_SENSOR); + Sensor<T> destinationSensor = (Sensor<T>) getConfig(TARGET_SENSOR); - List<Object> values = Lists.newArrayList(); + List<S> values = Lists.newArrayList(); + + for (AttributeSensor<S> sourceSensor : subscribedSensors) { + S resolvedSensorValue = entity.sensors().get(sourceSensor); + if (resolvedSensorValue == null) { + // only apply function if all values are resolved + return; + } - for (AttributeSensor<?> sourceSensor : subscribedSensors) { - Object resolvedSensorValue = entity.sensors().get(sourceSensor); values.add(resolvedSensorValue); } @@ -84,6 +103,60 @@ public class Reducer extends AbstractEnricher implements SensorEventListener<Obj if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {} as {}", new Object[] {this, event, entity, reducerFunction, destinationSensor}); - emit((Sensor)destinationSensor, result); + emit((Sensor<T>)destinationSensor, result); + } + + public static class StringStringReducer extends Reducer<String, String> { + + public StringStringReducer() {} + + @Override + protected Function<List<String>, String> createReducerFunction( + String reducerName, Map<String, ?> parameters) { + if(reducerName.equals("joiner")){ + return new JoinerFunction(parameters.get("separator")); + } + throw new IllegalStateException("unknown function: " + reducerName); + } + } + + public static class JoinerReducerFunction<A> implements Function<List<A>, String> { + + private Object separator; + + public JoinerReducerFunction(Object separator) { + this.separator = (separator == null) ? ", " : separator; + } + + @Override + public String apply(List<A> input) { + + StringBuilder sb = new StringBuilder(); + Iterator<A> it = input.iterator(); + while(it.hasNext()) { + sb.append(it.next().toString()); + if(it.hasNext()){ + sb.append(separator); + } + } + return sb.toString(); + } + + } + + public static class JoinerFunction extends JoinerReducerFunction<String>{ + + public JoinerFunction(Object separator) { + super(separator); + } + } + + public static class ToStringReducerFunction<A> implements Function<List<A>, String> { + + @Override + public String apply(List<A> input) { + return input.toString(); + } + } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/f0ca5748/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java index 7a350eb..a39d76c 100644 --- a/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java +++ b/core/src/test/java/org/apache/brooklyn/enricher/stock/ReducerTest.java @@ -1,6 +1,7 @@ package org.apache.brooklyn.enricher.stock; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; @@ -10,18 +11,25 @@ import org.apache.brooklyn.api.sensor.EnricherSpec; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.enricher.stock.Reducer.StringStringReducer; +import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.test.EntityTestUtils; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class ReducerTest extends BrooklynAppUnitTestSupport { public static final AttributeSensor<String> STR1 = Sensors.newStringSensor("test.str1"); public static final AttributeSensor<String> STR2 = Sensors.newStringSensor("test.str2"); public static final AttributeSensor<String> STR3 = Sensors.newStringSensor("test.str3"); + public static final AttributeSensor<Integer> INT1 = Sensors.newIntegerSensor("test.int1"); private TestEntity entity; @@ -33,13 +41,18 @@ public class ReducerTest extends BrooklynAppUnitTestSupport { } @Test - public void testBasicReducer() { - entity.addEnricher(EnricherSpec.create(Reducer.class) - .configure(Reducer.PRODUCER, entity) - .configure(Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2)) - .configure(Reducer.TARGET_SENSOR, STR3) - .configure(Reducer.REDUCER_FUNCTION, new Concatenator()) + public void testBasicReducer(){ + entity.addEnricher(EnricherSpec.create(StringStringReducer.class).configure( + MutableMap.of( + Reducer.SOURCE_SENSORS, ImmutableList.of(STR1, STR2), + Reducer.PRODUCER, entity, + Reducer.TARGET_SENSOR, STR3, + Reducer.REDUCER_FUNCTION, new Concatenator()) + ) ); + + EntityTestUtils.assertAttributeEquals(entity, STR3, null); + entity.sensors().set(STR1, "foo"); EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null); @@ -48,34 +61,156 @@ public class ReducerTest extends BrooklynAppUnitTestSupport { } @Test - public void testReducingBuilder() { - entity.addEnricher(Enrichers.builder().reducing(ImmutableList.<AttributeSensor<?>>of(STR1, STR2)) + public void testReducingBuilderWithConcatenator() { + entity.addEnricher(Enrichers.builder() + .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2)) .from(entity) .computing(new Concatenator()) .publishing(STR3) .build() ); + EntityTestUtils.assertAttributeEquals(entity, STR3, null); + entity.sensors().set(STR1, "foo"); EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null); entity.sensors().set(STR2, "bar"); EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foobar"); } + + @Test + public void testReducingBuilderWithLengthCalculator() { + entity.addEnricher(Enrichers.builder() + .reducing(StringIntegerReducer.class, ImmutableList.of(STR1, STR2)) + .from(entity) + .computing(new LengthCalculator()) + .publishing(INT1) + .build() + ); + + EntityTestUtils.assertAttributeEquals(entity, INT1, null); + + entity.sensors().set(STR1, "foo"); + EntityTestUtils.assertAttributeEqualsContinually(entity, INT1, null); + + entity.sensors().set(STR2, "bar"); + EntityTestUtils.assertAttributeEqualsEventually(entity, INT1, 6); + } + + @Test + public void testReducingBuilderWithJoinerFunction() { + entity.addEnricher(Enrichers.builder() + .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2)) + .from(entity) + .computing("joiner", ImmutableMap.<String, Object>of("separator", "-")) + .publishing(STR3) + .build() + ); + + EntityTestUtils.assertAttributeEquals(entity, STR3, null); + + entity.sensors().set(STR1, "foo"); + EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null); + + entity.sensors().set(STR2, "bar"); + EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo-bar"); + } + + @Test + public void testReducingBuilderWithJoinerFunctionWithDefaultParameter() { + entity.addEnricher(Enrichers.builder() + .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2)) + .from(entity) + .computing("joiner") + .publishing(STR3) + .build() + ); + EntityTestUtils.assertAttributeEquals(entity, STR3, null); + + entity.sensors().set(STR1, "foo"); + EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null); + + entity.sensors().set(STR2, "bar"); + EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo, bar"); + } + + @Test + public void testReducingBuilderWithJoinerFunctionAndUnusedParameter() { + + entity.addEnricher(Enrichers.builder() + .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2)) + .from(entity) + .computing("joiner", ImmutableMap.<String, Object>of("non.existent.parameter", "-")) + .publishing(STR3) + .build() + ); + EntityTestUtils.assertAttributeEquals(entity, STR3, null); + + entity.sensors().set(STR1, "foo"); + EntityTestUtils.assertAttributeEqualsContinually(entity, STR3, null); - private class Concatenator implements Function<List<?>, String> { + entity.sensors().set(STR2, "bar"); + EntityTestUtils.assertAttributeEqualsEventually(entity, STR3, "foo, bar"); + } + + @Test + public void testReducingBuilderWithNamedNonExistentFunction() { + try { + entity.addEnricher(Enrichers.builder() + .reducing(StringStringReducer.class, ImmutableList.of(STR1, STR2)) + .from(entity) + .computing("unknown function name", ImmutableMap.<String, Object>of("separator", "-")) + .publishing(STR3) + .build() + ); + Asserts.fail("Expected exception when adding reducing enricher with unknown named function"); + } catch (Exception e) { + Throwable t = Exceptions.getFirstThrowableOfType(e, IllegalStateException.class); + Assert.assertNotNull(t); + } + } + + private static class Concatenator implements Function<List<String>, String> { @Nullable @Override - public String apply(List<?> values) { - String result = ""; - for (Object value : values) { + public String apply(List<String> values) { + StringBuilder result = new StringBuilder(); + for (String value : values) { if (value == null) { return null; } else { - result += String.valueOf(value); + result.append(value); + } + } + return result.toString(); + } + } + + public static class StringIntegerReducer extends Reducer<String, Integer> { + + public StringIntegerReducer() {} + + @Override + protected Function<List<String>, Integer> createReducerFunction( + String reducerName, Map<String, ?> parameters) { + throw new IllegalStateException("unknown function: " + reducerName); + } + + } + + private static class LengthCalculator implements Function<List<String>, Integer>{ + + @Override + public Integer apply(List<String> values) { + int acc = 0; + for (String value : values) { + if (value != null) { + acc += value.length(); } } - return result; + return acc; } + } }
