http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/SensorTransformingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/SensorTransformingEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/SensorTransformingEnricher.java new file mode 100644 index 0000000..92319f1 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/SensorTransformingEnricher.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.util.groovy.GroovyJavaMethods; +import org.apache.brooklyn.util.javalang.JavaClassNames; +import org.apache.brooklyn.util.time.Duration; + +import groovy.lang.Closure; + +import com.google.common.base.Function; + +/** + * @deprecated since 0.7.0; use {@link Enrichers.builder()} + * @see Transformer if need to sub-class + */ +public class SensorTransformingEnricher<T,U> extends AbstractTypeTransformingEnricher { + + private Function<? super T, ? extends U> transformation; + + public SensorTransformingEnricher(Entity producer, Sensor<T> source, Sensor<U> target, Function<? super T, ? extends U> transformation) { + super(producer, source, target); + this.transformation = transformation; + this.uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+source.getName()+"*->"+target.getName();; + } + + public SensorTransformingEnricher(Entity producer, Sensor<T> source, Sensor<U> target, Closure transformation) { + this(producer, source, target, GroovyJavaMethods.functionFromClosure(transformation)); + } + + public SensorTransformingEnricher(Sensor<T> source, Sensor<U> target, Function<T,U> transformation) { + this(null, source, target, transformation); + } + + public SensorTransformingEnricher(Sensor<T> source, Sensor<U> target, Closure transformation) { + this(null, source, target, GroovyJavaMethods.functionFromClosure(transformation)); + } + + @Override + public void onEvent(SensorEvent event) { + if (accept((T)event.getValue())) { + if (target instanceof AttributeSensor) + entity.setAttribute((AttributeSensor)target, compute((T)event.getValue())); + else + entity.emit(target, compute((T)event.getValue())); + } + } + + protected boolean accept(T value) { + return true; + } + + protected U compute(T value) { + return transformation.apply(value); + } + + /** + * creates an enricher which listens to a source (from the producer), + * transforms it and publishes it under the target + * + * Instead, consider calling: + * <pre> + * {@code + * addEnricher(Enrichers.builder() + * .transforming(source) + * .publishing(target) + * .from(producer) + * .computing(transformation) + * .build()); + * } + * </pre> + * + * @deprecated since 0.7.0; use {@link Enrichers.builder()} + */ + public static <U,V> SensorTransformingEnricher<U,V> newInstanceTransforming(Entity producer, AttributeSensor<U> source, + Function<U,V> transformation, AttributeSensor<V> target) { + return new SensorTransformingEnricher<U,V>(producer, source, target, transformation); + } + + /** as {@link #newInstanceTransforming(Entity, AttributeSensor, Function, AttributeSensor)} + * using the same sensor as the source and the target */ + public static <T> SensorTransformingEnricher<T,T> newInstanceTransforming(Entity producer, AttributeSensor<T> sensor, + Function<T,T> transformation) { + return newInstanceTransforming(producer, sensor, transformation, sensor); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java new file mode 100644 index 0000000..ef23ab4 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/Transformer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.util.collections.MutableSet; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.core.task.ValueResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.reflect.TypeToken; + +//@Catalog(name="Transformer", description="Transforms attributes of an entity; see Enrichers.builder().transforming(...)") +@SuppressWarnings("serial") +public class Transformer<T,U> extends AbstractTransformer<T,U> { + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); + + // exactly one of these should be supplied to set a value + public static ConfigKey<?> TARGET_VALUE = ConfigKeys.newConfigKey(Object.class, "enricher.targetValue"); + public static ConfigKey<Function<?, ?>> TRANSFORMATION_FROM_VALUE = ConfigKeys.newConfigKey(new TypeToken<Function<?, ?>>() {}, "enricher.transformation"); + public static ConfigKey<Function<?, ?>> TRANSFORMATION_FROM_EVENT = ConfigKeys.newConfigKey(new TypeToken<Function<?, ?>>() {}, "enricher.transformation.fromevent"); + + public Transformer() { + } + + /** returns a function for transformation, for immediate use only (not for caching, as it may change) */ + @Override + @SuppressWarnings("unchecked") + protected Function<SensorEvent<T>, U> getTransformation() { + MutableSet<Object> suppliers = MutableSet.of(); + suppliers.addIfNotNull(config().getRaw(TARGET_VALUE).orNull()); + suppliers.addIfNotNull(config().getRaw(TRANSFORMATION_FROM_EVENT).orNull()); + suppliers.addIfNotNull(config().getRaw(TRANSFORMATION_FROM_VALUE).orNull()); + checkArgument(suppliers.size()==1, + "Must set exactly one of: %s, %s, %s", TARGET_VALUE.getName(), TRANSFORMATION_FROM_VALUE.getName(), TRANSFORMATION_FROM_EVENT.getName()); + + Function<?, ?> fromEvent = config().get(TRANSFORMATION_FROM_EVENT); + if (fromEvent != null) { + return (Function<SensorEvent<T>, U>) fromEvent; + } + + final Function<T, U> fromValueFn = (Function<T, U>) config().get(TRANSFORMATION_FROM_VALUE); + if (fromValueFn != null) { + // named class not necessary as result should not be serialized + return new Function<SensorEvent<T>, U>() { + @Override public U apply(SensorEvent<T> input) { + return fromValueFn.apply(input.getValue()); + } + @Override + public String toString() { + return ""+fromValueFn; + } + }; + } + + // from target value + // named class not necessary as result should not be serialized + final Object targetValueRaw = config().getRaw(TARGET_VALUE).orNull(); + return new Function<SensorEvent<T>, U>() { + @Override public U apply(SensorEvent<T> input) { + // evaluate immediately, or return null + // PRETTY_QUICK/200ms seems a reasonable compromise for tasks which require BG evaluation + // but which are non-blocking + // TODO better would be to have a mode in which tasks are not permitted to block on + // external events; they can submit tasks and block on them (or even better, have a callback architecture); + // however that is a non-trivial refactoring + return (U) Tasks.resolving(targetValueRaw).as(targetSensor.getType()) + .context(entity) + .description("Computing sensor "+targetSensor+" from "+targetValueRaw) + .timeout(ValueResolver.PRETTY_QUICK_WAIT) + .getMaybe().orNull(); + } + public String toString() { + return ""+targetValueRaw; + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java new file mode 100644 index 0000000..b09b6d6 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import java.util.Map; + +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.enricher.AbstractEnricher; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.flags.SetFromFlag; +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Maps; +import com.google.common.reflect.TypeToken; + +/** + * Enricher which updates an entry in a sensor map ({@link #TARGET_SENSOR}) + * based on the value of another sensor ({@link #SOURCE_SENSOR}. + * <p> + * The key used defaults to the name of the source sensor but can be specified with {@link #KEY_IN_TARGET_SENSOR}. + * The value placed in the map is the result of applying the function in {@link #COMPUTING} to the sensor value, + * with default behaviour being to remove an entry if <code>null</code> is returned + * but this can be overriden by setting {@link #REMOVING_IF_RESULT_IS_NULL} false. + * {@link Entities#REMOVE} and {@link Entities#UNCHANGED} are also respeced as return values for the computation + * (ignoring generics). + * Unlike most other enrichers, this defaults to {@link AbstractEnricher#SUPPRESS_DUPLICATES} being true + * + * @author alex + * + * @param <S> source sensor type + * @param <TKey> key type in target sensor map + * @param <TVal> value type in target sensor map + */ +@SuppressWarnings("serial") +public class UpdatingMap<S,TKey,TVal> extends AbstractEnricher implements SensorEventListener<S> { + + private static final Logger LOG = LoggerFactory.getLogger(UpdatingMap.class); + + @SetFromFlag("fromSensor") + public static final ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); + @SetFromFlag("targetSensor") + public static final ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); + @SetFromFlag("key") + public static final ConfigKey<?> KEY_IN_TARGET_SENSOR = ConfigKeys.newConfigKey(Object.class, "enricher.updatingMap.keyInTargetSensor", + "Key to update in the target sensor map, defaulting to the name of the source sensor"); + @SetFromFlag("computing") + public static final ConfigKey<Function<?, ?>> COMPUTING = ConfigKeys.newConfigKey(new TypeToken<Function<?,?>>() {}, "enricher.updatingMap.computing"); + @SetFromFlag("removingIfResultIsNull") + public static final ConfigKey<Boolean> REMOVING_IF_RESULT_IS_NULL = ConfigKeys.newBooleanConfigKey("enricher.updatingMap.removingIfResultIsNull", + "Whether the key in the target map is removed if the result if the computation is null"); + + protected AttributeSensor<S> sourceSensor; + protected AttributeSensor<Map<TKey,TVal>> targetSensor; + protected TKey key; + protected Function<S,? extends TVal> computing; + protected Boolean removingIfResultIsNull; + + public UpdatingMap() { + this(Maps.newLinkedHashMap()); + } + + public UpdatingMap(Map<Object, Object> flags) { + super(flags); + // this always suppresses duplicates, but it updates the same map *in place* so the usual suppress duplicates logic should not be applied + // TODO clean up so that we have synchronization guarantees and can inspect the item to see whether it has changed + suppressDuplicates = false; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + this.sourceSensor = (AttributeSensor<S>) getRequiredConfig(SOURCE_SENSOR); + this.targetSensor = (AttributeSensor<Map<TKey,TVal>>) getRequiredConfig(TARGET_SENSOR); + this.key = (TKey) getConfig(KEY_IN_TARGET_SENSOR); + this.computing = (Function) getRequiredConfig(COMPUTING); + this.removingIfResultIsNull = getConfig(REMOVING_IF_RESULT_IS_NULL); + + subscribe(entity, sourceSensor, this); + onUpdated(); + } + + @Override + public void onEvent(SensorEvent<S> event) { + onUpdated(); + } + + /** + * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed). + */ + @SuppressWarnings("unchecked") + protected void onUpdated() { + try { + Object v = computing.apply(entity.getAttribute(sourceSensor)); + if (v == null && !Boolean.FALSE.equals(removingIfResultIsNull)) { + v = Entities.REMOVE; + } + if (v == Entities.UNCHANGED) { + // nothing + } else { + // TODO check synchronization + TKey key = this.key; + if (key==null) key = (TKey) sourceSensor.getName(); + + Map<TKey, TVal> map = entity.getAttribute(targetSensor); + + boolean created = (map==null); + if (created) map = MutableMap.of(); + + boolean changed; + if (v == Entities.REMOVE) { + changed = map.containsKey(key); + if (changed) + map.remove(key); + } else { + TVal oldV = map.get(key); + if (oldV==null) + changed = (v!=null || !map.containsKey(key)); + else + changed = !oldV.equals(v); + if (changed) + map.put(key, (TVal)v); + } + if (changed || created) + emit(targetSensor, map); + } + } catch (Throwable t) { + LOG.warn("Error calculating map update for enricher "+this, t); + throw Exceptions.propagate(t); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/YamlRollingTimeWindowMeanEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/YamlRollingTimeWindowMeanEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/YamlRollingTimeWindowMeanEnricher.java new file mode 100644 index 0000000..f4e5484 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/YamlRollingTimeWindowMeanEnricher.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import java.util.Iterator; +import java.util.LinkedList; + +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.base.Function; + +/** + * Transforms {@link Sensor} data into a rolling average based on a time window. + * + * All values within the window are weighted or discarded based on the timestamps associated with + * them (discards occur when a new value is added or an average is requested) + * <p> + * This will not extrapolate figures - it is assumed a value is valid and correct for the entire + * time period between it and the previous value. Normally, the average attribute is only updated + * when a new value arrives so it can give a fully informed average, but there is a danger of this + * going stale. + * <p> + * When an average is requested, it is likely there will be a segment of the window for which there + * isn't a value. Instead of extrapolating a value and providing different extrapolation techniques, + * the average is reported with a confidence value which reflects the fraction of the time + * window for which the values were valid. + * <p> + * Consumers of the average may ignore the confidence value and just use the last known average. + * They could multiply the returned value by the confidence value to get a decay-type behavior as + * the window empties. A third alternative is to, at a certain confidence threshold, report that + * the average is no longer meaningful. + * <p> + * The default average when no data has been received is 0, with a confidence of 0 + */ +public class YamlRollingTimeWindowMeanEnricher<T extends Number> extends AbstractTransformer<T,Double> { + + public static ConfigKey<Duration> WINDOW_DURATION = ConfigKeys.newConfigKey(Duration.class, "enricher.window.duration", + "Duration for which this window should store data, default one minute", Duration.ONE_MINUTE); + + public static ConfigKey<Double> CONFIDENCE_REQUIRED_TO_PUBLISH = ConfigKeys.newDoubleConfigKey("enricher.window.confidenceRequired", + "Minimum confidence level (ie period covered) required to publish a rolling average", 0.8d); + + public static class ConfidenceQualifiedNumber { + final Double value; + final double confidence; + + public ConfidenceQualifiedNumber(Double value, double confidence) { + this.value = value; + this.confidence = confidence; + } + + @Override + public String toString() { + return ""+value+" ("+(int)(confidence*100)+"%)"; + } + + } + + private final LinkedList<T> values = new LinkedList<T>(); + private final LinkedList<Long> timestamps = new LinkedList<Long>(); + volatile ConfidenceQualifiedNumber lastAverage = new ConfidenceQualifiedNumber(0d,0d); + + @Override + protected Function<SensorEvent<T>, Double> getTransformation() { + return new Function<SensorEvent<T>, Double>() { + @Override + public Double apply(SensorEvent<T> event) { + long eventTime = event.getTimestamp(); + if (event.getValue()==null) { + return null; + } + values.addLast(event.getValue()); + timestamps.addLast(eventTime); + if (eventTime>0) { + ConfidenceQualifiedNumber average = getAverage(eventTime, 0); + + if (average.confidence > getConfig(CONFIDENCE_REQUIRED_TO_PUBLISH)) { + // without confidence, we might publish wildly varying estimates, + // causing spurious resizes, so allow it to be configured, and + // by default require a high value + + // TODO would be nice to include timestamp, etc + return average.value; + } + } + return null; + } + }; + } + + public ConfidenceQualifiedNumber getAverage(long fromTime, long graceAllowed) { + if (timestamps.isEmpty()) { + return lastAverage = new ConfidenceQualifiedNumber(lastAverage.value, 0.0d); + } + + long firstTimestamp = -1; + Iterator<Long> ti = timestamps.iterator(); + while (ti.hasNext()) { + firstTimestamp = ti.next(); + if (firstTimestamp>0) break; + } + if (firstTimestamp<=0) { + // no values with reasonable timestamps + return lastAverage = new ConfidenceQualifiedNumber(values.get(values.size()-1).doubleValue(), 0.0d); + } + + long lastTimestamp = timestamps.get(timestamps.size()-1); + + long now = fromTime; + if (lastTimestamp > fromTime - graceAllowed) { + // without this, if the computation takes place X seconds after the publish, + // we treat X seconds as time for which we have no confidence in the data + now = lastTimestamp; + } + pruneValues(now); + + Duration timePeriod = getConfig(WINDOW_DURATION); + long windowStart = Math.max(now-timePeriod.toMilliseconds(), firstTimestamp); + long windowEnd = Math.max(now-timePeriod.toMilliseconds(), lastTimestamp); + Double confidence = ((double)(windowEnd - windowStart)) / timePeriod.toMilliseconds(); + if (confidence <= 0.0000001d) { + // not enough timestamps in window + double lastValue = values.get(values.size()-1).doubleValue(); + return lastAverage = new ConfidenceQualifiedNumber(lastValue, 0.0d); + } + + long start = windowStart; + long end; + double weightedAverage = 0.0d; + + Iterator<T> valuesIter = values.iterator(); + Iterator<Long> timestampsIter = timestamps.iterator(); + while (valuesIter.hasNext()) { + // Ignores null and out-of-date values (and also values that are received out-of-order, but that shouldn't happen!) + Number val = valuesIter.next(); + Long timestamp = timestampsIter.next(); + if (val!=null && timestamp >= start) { + end = timestamp; + weightedAverage += ((end - start) / (confidence * timePeriod.toMilliseconds())) * val.doubleValue(); + start = timestamp; + } + } + + return lastAverage = new ConfidenceQualifiedNumber(weightedAverage, confidence); + } + + /** + * Discards out-of-date values, but keeps at least one value. + */ + private void pruneValues(long now) { + // keep one value from before the period, so that we can tell the window's start time + Duration timePeriod = getConfig(WINDOW_DURATION); + while(timestamps.size() > 1 && timestamps.get(1) < (now - timePeriod.toMilliseconds())) { + timestamps.removeFirst(); + values.removeFirst(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/enricher/stock/YamlTimeWeightedDeltaEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/YamlTimeWeightedDeltaEnricher.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/YamlTimeWeightedDeltaEnricher.java new file mode 100644 index 0000000..e1a661e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/YamlTimeWeightedDeltaEnricher.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.enricher.stock; + +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.enricher.stock.AbstractTransformer; +import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; + +/** + * Converts an absolute count sensor into a delta sensor (i.e. the diff between the current and previous value), + * presented as a units/timeUnit based on the event timing. + * <p> + * For example, given a requests.count sensor, this can make a requests.per_sec sensor with {@link #DELTA_PERIOD} set to "1s" (the default). + * <p> + * Suitable for configuration from YAML. + */ +public class YamlTimeWeightedDeltaEnricher<T extends Number> extends AbstractTransformer<T,Double> { + private static final Logger LOG = LoggerFactory.getLogger(YamlTimeWeightedDeltaEnricher.class); + + transient Object lock = new Object(); + Number lastValue; + long lastTime = -1; + + public static ConfigKey<Duration> DELTA_PERIOD = ConfigKeys.newConfigKey(Duration.class, "enricher.delta.period", + "Duration that this delta should compute for, default per second", Duration.ONE_SECOND); + + @Override + protected Function<SensorEvent<T>, Double> getTransformation() { + return new Function<SensorEvent<T>, Double>() { + @Override + public Double apply(SensorEvent<T> event) { + synchronized (lock) { + Double current = TypeCoercions.coerce(event.getValue(), Double.class); + + if (current == null) return null; + + long eventTime = event.getTimestamp(); + long unitMillis = getConfig(DELTA_PERIOD).toMilliseconds(); + Double result = null; + + if (eventTime > 0 && eventTime > lastTime) { + if (lastValue == null || lastTime < 0) { + // cannot calculate time-based delta with a single value + if (LOG.isTraceEnabled()) LOG.trace("{} received event but no last value so will not emit, null -> {} at {}", new Object[] {this, current, eventTime}); + } else { + double duration = eventTime - lastTime; + result = (current - lastValue.doubleValue()) / (duration / unitMillis); + } + } + + lastValue = current; + lastTime = eventTime; + + return result; + } + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabricImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabricImpl.java b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabricImpl.java index daedc39..717b0e4 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabricImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/group/DynamicFabricImpl.java @@ -41,7 +41,7 @@ import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; import org.apache.brooklyn.core.entity.trait.Changeable; import org.apache.brooklyn.core.entity.trait.Startable; -import org.apache.brooklyn.sensor.enricher.Enrichers; +import org.apache.brooklyn.enricher.stock.Enrichers; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.groovy.GroovyJavaMethods; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntityImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntityImpl.java b/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntityImpl.java index a160421..5728635 100644 --- a/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntityImpl.java +++ b/core/src/main/java/org/apache/brooklyn/entity/stock/DelegateEntityImpl.java @@ -20,7 +20,7 @@ package org.apache.brooklyn.entity.stock; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.core.entity.AbstractEntity; -import org.apache.brooklyn.sensor.enricher.Enrichers; +import org.apache.brooklyn.enricher.stock.Enrichers; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractAggregatingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractAggregatingEnricher.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractAggregatingEnricher.java deleted file mode 100644 index c140c9f..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractAggregatingEnricher.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.core.entity.trait.Changeable; -import org.apache.brooklyn.util.groovy.GroovyJavaMethods; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableMap; - - -/** - * AggregatingEnrichers implicitly subscribes to the same sensor<S> on all entities inside an - * {@link Group} and should emit an aggregate<T> on the target sensor - * - * @deprecated since 0.7.0; use {@link Enrichers.builder()} - * @see Aggregator if need to sub-class - */ -public abstract class AbstractAggregatingEnricher<S,T> extends AbstractEnricher implements SensorEventListener<S> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractAggregatingEnricher.class); - - AttributeSensor<? extends S> source; - protected AttributeSensor<T> target; - protected S defaultValue; - - Set<Entity> producers; - List<Entity> hardCodedProducers; - boolean allMembers; - Predicate<Entity> filter; - - /** - * Users of values should either on it synchronize when iterating over its entries or use - * copyOfValues to obtain an immutable copy of the map. - */ - // We use a synchronizedMap over a ConcurrentHashMap for entities that store null values. - protected final Map<Entity, S> values = Collections.synchronizedMap(new LinkedHashMap<Entity, S>()); - - public AbstractAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target) { - this(flags, source, target, null); - } - - @SuppressWarnings("unchecked") - public AbstractAggregatingEnricher(Map<String,?> flags, AttributeSensor<? extends S> source, AttributeSensor<T> target, S defaultValue) { - super(flags); - this.source = source; - this.target = target; - this.defaultValue = defaultValue; - hardCodedProducers = (List<Entity>) (flags.containsKey("producers") ? flags.get("producers") : Collections.emptyList()); - allMembers = (Boolean) (flags.containsKey("allMembers") ? flags.get("allMembers") : false); - filter = flags.containsKey("filter") ? GroovyJavaMethods.<Entity>castToPredicate(flags.get("filter")) : Predicates.<Entity>alwaysTrue(); - } - - public void addProducer(Entity producer) { - if (LOG.isDebugEnabled()) LOG.debug("{} linked ({}, {}) to {}", new Object[] {this, producer, source, target}); - subscribe(producer, source, this); - synchronized (values) { - S vo = values.get(producer); - if (vo==null) { - S initialVal = ((EntityLocal)producer).getAttribute(source); - values.put(producer, initialVal != null ? initialVal : defaultValue); - //we might skip in onEvent in the short window while !values.containsKey(producer) - //but that's okay because the put which would have been done there is done here now - } else { - //vo will be null unless some weird race with addProducer+removeProducer is occuring - //(and that's something we can tolerate i think) - if (LOG.isDebugEnabled()) LOG.debug("{} already had value ({}) for producer ({}); but that producer has just been added", new Object[] {this, vo, producer}); - } - } - onUpdated(); - } - - // TODO If producer removed but then get (queued) event from it after this method returns, - public S removeProducer(Entity producer) { - if (LOG.isDebugEnabled()) LOG.debug("{} unlinked ({}, {}) from {}", new Object[] {this, producer, source, target}); - unsubscribe(producer); - S removed = values.remove(producer); - onUpdated(); - return removed; - } - - @Override - public void onEvent(SensorEvent<S> event) { - Entity e = event.getSource(); - synchronized (values) { - if (values.containsKey(e)) { - values.put(e, event.getValue()); - } else { - if (LOG.isDebugEnabled()) LOG.debug("{} received event for unknown producer ({}); presumably that producer has recently been removed", this, e); - } - } - onUpdated(); - } - - /** - * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed). - * Defaults to no-op - */ - // TODO should this be abstract? - protected void onUpdated() { - // no-op - } - - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - - for (Entity producer : hardCodedProducers) { - if (filter.apply(producer)) { - addProducer(producer); - } - } - - if (allMembers) { - subscribe(entity, Changeable.MEMBER_ADDED, new SensorEventListener<Entity>() { - @Override public void onEvent(SensorEvent<Entity> it) { - if (filter.apply(it.getValue())) addProducer(it.getValue()); - } - }); - subscribe(entity, Changeable.MEMBER_REMOVED, new SensorEventListener<Entity>() { - @Override public void onEvent(SensorEvent<Entity> it) { - removeProducer(it.getValue()); - } - }); - - if (entity instanceof Group) { - for (Entity member : ((Group)entity).getMembers()) { - if (filter.apply(member)) { - addProducer(member); - } - } - } - } - } - - protected Map<Entity, S> copyOfValues() { - synchronized (values) { - return ImmutableMap.copyOf(values); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractAggregator.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractAggregator.java deleted file mode 100644 index a06e6d6..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractAggregator.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import static com.google.common.base.Preconditions.checkState; - -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.AbstractEntity; -import org.apache.brooklyn.core.entity.trait.Changeable; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.guava.Maybe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import com.google.common.reflect.TypeToken; - -/** Abstract superclass for enrichers which aggregate from children and/or members */ -@SuppressWarnings("serial") -public abstract class AbstractAggregator<T,U> extends AbstractEnricher implements SensorEventListener<T> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class); - - public static final ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer", "The entity whose children/members will be aggregated"); - - public static final ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); - - // FIXME this is not just for "members" i think -Alex - public static final ConfigKey<?> DEFAULT_MEMBER_VALUE = ConfigKeys.newConfigKey(Object.class, "enricher.defaultMemberValue"); - - public static final ConfigKey<Set<? extends Entity>> FROM_HARDCODED_PRODUCERS = ConfigKeys.newConfigKey(new TypeToken<Set<? extends Entity>>() {}, "enricher.aggregating.fromHardcodedProducers"); - - public static final ConfigKey<Boolean> FROM_MEMBERS = ConfigKeys.newBooleanConfigKey("enricher.aggregating.fromMembers", - "Whether this enricher looks at members; only supported if a Group producer is supplier; defaults to true for Group entities"); - - public static final ConfigKey<Boolean> FROM_CHILDREN = ConfigKeys.newBooleanConfigKey("enricher.aggregating.fromChildren", - "Whether this enricher looks at children; this is the default for non-Group producers"); - - public static final ConfigKey<Predicate<? super Entity>> ENTITY_FILTER = ConfigKeys.newConfigKey(new TypeToken<Predicate<? super Entity>>() {}, "enricher.aggregating.entityFilter"); - - public static final ConfigKey<Predicate<?>> VALUE_FILTER = ConfigKeys.newConfigKey(new TypeToken<Predicate<?>>() {}, "enricher.aggregating.valueFilter"); - - protected Entity producer; - protected Sensor<U> targetSensor; - protected T defaultMemberValue; - protected Set<? extends Entity> fromHardcodedProducers; - protected Boolean fromMembers; - protected Boolean fromChildren; - protected Predicate<? super Entity> entityFilter; - protected Predicate<? super T> valueFilter; - - public AbstractAggregator() {} - - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - setEntityLoadingConfig(); - - if (fromHardcodedProducers == null && producer == null) producer = entity; - checkState(fromHardcodedProducers != null ^ producer != null, "must specify one of %s (%s) or %s (%s)", - PRODUCER.getName(), producer, FROM_HARDCODED_PRODUCERS.getName(), fromHardcodedProducers); - - if (fromHardcodedProducers != null) { - for (Entity producer : Iterables.filter(fromHardcodedProducers, entityFilter)) { - addProducerHardcoded(producer); - } - } - - if (isAggregatingMembers()) { - setEntityBeforeSubscribingProducerMemberEvents(entity); - setEntitySubscribeProducerMemberEvents(); - setEntityAfterSubscribingProducerMemberEvents(); - } - - if (isAggregatingChildren()) { - setEntityBeforeSubscribingProducerChildrenEvents(); - setEntitySubscribingProducerChildrenEvents(); - setEntityAfterSubscribingProducerChildrenEvents(); - } - - onUpdated(); - } - - @SuppressWarnings({ "unchecked" }) - protected void setEntityLoadingConfig() { - this.producer = getConfig(PRODUCER); - this.fromHardcodedProducers= getConfig(FROM_HARDCODED_PRODUCERS); - this.defaultMemberValue = (T) getConfig(DEFAULT_MEMBER_VALUE); - this.fromMembers = Maybe.fromNullable(getConfig(FROM_MEMBERS)).or(fromMembers); - this.fromChildren = Maybe.fromNullable(getConfig(FROM_CHILDREN)).or(fromChildren); - this.entityFilter = (Predicate<? super Entity>) (getConfig(ENTITY_FILTER) == null ? Predicates.alwaysTrue() : getConfig(ENTITY_FILTER)); - this.valueFilter = (Predicate<? super T>) (getConfig(VALUE_FILTER) == null ? getDefaultValueFilter() : getConfig(VALUE_FILTER)); - - setEntityLoadingTargetConfig(); - } - - protected Predicate<?> getDefaultValueFilter() { - return Predicates.alwaysTrue(); - } - - @SuppressWarnings({ "unchecked" }) - protected void setEntityLoadingTargetConfig() { - this.targetSensor = (Sensor<U>) getRequiredConfig(TARGET_SENSOR); - } - - protected void setEntityBeforeSubscribingProducerMemberEvents(EntityLocal entity) { - checkState(producer instanceof Group, "Producer must be a group when fromMembers true: producer=%s; entity=%s; " - + "hardcodedProducers=%s", getConfig(PRODUCER), entity, fromHardcodedProducers); - } - - protected void setEntitySubscribeProducerMemberEvents() { - subscribe(producer, Changeable.MEMBER_ADDED, new SensorEventListener<Entity>() { - @Override public void onEvent(SensorEvent<Entity> event) { - if (entityFilter.apply(event.getValue())) { - addProducerMember(event.getValue()); - onUpdated(); - } - } - }); - subscribe(producer, Changeable.MEMBER_REMOVED, new SensorEventListener<Entity>() { - @Override public void onEvent(SensorEvent<Entity> event) { - removeProducer(event.getValue()); - onUpdated(); - } - }); - } - - protected void setEntityAfterSubscribingProducerMemberEvents() { - if (producer instanceof Group) { - for (Entity member : Iterables.filter(((Group)producer).getMembers(), entityFilter)) { - addProducerMember(member); - } - } - } - - protected void setEntityBeforeSubscribingProducerChildrenEvents() { - } - - protected void setEntitySubscribingProducerChildrenEvents() { - subscribe(producer, AbstractEntity.CHILD_REMOVED, new SensorEventListener<Entity>() { - @Override public void onEvent(SensorEvent<Entity> event) { - removeProducer(event.getValue()); - onUpdated(); - } - }); - subscribe(producer, AbstractEntity.CHILD_ADDED, new SensorEventListener<Entity>() { - @Override public void onEvent(SensorEvent<Entity> event) { - if (entityFilter.apply(event.getValue())) { - addProducerChild(event.getValue()); - onUpdated(); - } - } - }); - } - - protected void setEntityAfterSubscribingProducerChildrenEvents() { - for (Entity child : Iterables.filter(producer.getChildren(), entityFilter)) { - addProducerChild(child); - } - } - - /** true if this should aggregate members */ - protected boolean isAggregatingMembers() { - if (Boolean.TRUE.equals(fromMembers)) return true; - if (Boolean.TRUE.equals(fromChildren)) return false; - if (fromHardcodedProducers!=null) return false; - if (producer instanceof Group) return true; - return false; - } - - /** true if this should aggregate members */ - protected boolean isAggregatingChildren() { - if (Boolean.TRUE.equals(fromChildren)) return true; - if (Boolean.TRUE.equals(fromMembers)) return false; - if (fromHardcodedProducers!=null) return false; - if (producer instanceof Group) return false; - return true; - } - - protected abstract void addProducerHardcoded(Entity producer); - protected abstract void addProducerMember(Entity producer); - protected abstract void addProducerChild(Entity producer); - - // TODO If producer removed but then get (queued) event from it after this method returns, - protected void removeProducer(Entity producer) { - if (LOG.isDebugEnabled()) LOG.debug("{} stopped listening to {}", new Object[] {this, producer }); - unsubscribe(producer); - onProducerRemoved(producer); - } - - protected abstract void onProducerAdded(Entity producer); - - protected abstract void onProducerRemoved(Entity producer); - - - /** - * Called whenever the values for the set of producers changes (e.g. on an event, or on a member added/removed). - */ - protected void onUpdated() { - try { - emit(targetSensor, compute()); - } catch (Throwable t) { - LOG.warn("Error calculating and setting aggregate for enricher "+this, t); - throw Exceptions.propagate(t); - } - } - - protected abstract Object compute(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractEnricher.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractEnricher.java deleted file mode 100644 index edf407d..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractEnricher.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import static com.google.common.base.Preconditions.checkState; - -import java.util.Map; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.mgmt.rebind.RebindSupport; -import org.apache.brooklyn.api.mgmt.rebind.mementos.EnricherMemento; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.Enricher; -import org.apache.brooklyn.api.sensor.EnricherType; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.mgmt.rebind.BasicEnricherRebindSupport; -import org.apache.brooklyn.core.objs.AbstractEntityAdjunct; -import org.apache.brooklyn.util.core.flags.TypeCoercions; - -import com.google.common.base.Objects; -import com.google.common.collect.Maps; - -/** -* Base {@link Enricher} implementation; all enrichers should extend this or its children -*/ -public abstract class AbstractEnricher extends AbstractEntityAdjunct implements Enricher { - - public static final ConfigKey<Boolean> SUPPRESS_DUPLICATES = ConfigKeys.newBooleanConfigKey("enricher.suppressDuplicates", - "Whether duplicate values published by this enricher should be suppressed"); - - private final EnricherDynamicType enricherType; - protected Boolean suppressDuplicates; - - public AbstractEnricher() { - this(Maps.newLinkedHashMap()); - } - - public AbstractEnricher(Map<?,?> flags) { - super(flags); - - enricherType = new EnricherDynamicType(this); - - if (isLegacyConstruction() && !isLegacyNoConstructionInit()) { - init(); - } - } - - @Override - public RebindSupport<EnricherMemento> getRebindSupport() { - return new BasicEnricherRebindSupport(this); - } - - @Override - public EnricherType getEnricherType() { - return enricherType.getSnapshot(); - } - - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - Boolean suppressDuplicates = getConfig(SUPPRESS_DUPLICATES); - if (suppressDuplicates!=null) - this.suppressDuplicates = suppressDuplicates; - } - - @Override - protected void onChanged() { - requestPersist(); - } - - @Override - protected <T> void emit(Sensor<T> sensor, Object val) { - checkState(entity != null, "entity must first be set"); - if (val == Entities.UNCHANGED) { - return; - } - if (val == Entities.REMOVE) { - ((EntityInternal)entity).removeAttribute((AttributeSensor<T>) sensor); - return; - } - - T newVal = TypeCoercions.coerce(val, sensor.getTypeToken()); - if (sensor instanceof AttributeSensor) { - if (Boolean.TRUE.equals(suppressDuplicates)) { - T oldValue = entity.getAttribute((AttributeSensor<T>)sensor); - if (Objects.equal(oldValue, newVal)) - return; - } - entity.setAttribute((AttributeSensor<T>)sensor, newVal); - } else { - entity.emit(sensor, newVal); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractMultipleSensorAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractMultipleSensorAggregator.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractMultipleSensorAggregator.java deleted file mode 100644 index c625c90..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractMultipleSensorAggregator.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.core.BrooklynLogging; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.flags.TypeCoercions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; - -/** Building on {@link AbstractAggregator} for a single source sensor (on multiple children and/or members) */ -public abstract class AbstractMultipleSensorAggregator<U> extends AbstractAggregator<Object,U> implements SensorEventListener<Object> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractMultipleSensorAggregator.class); - - - /** access via {@link #getValues(Sensor)} */ - private final Map<String, Map<Entity,Object>> values = Collections.synchronizedMap(new LinkedHashMap<String, Map<Entity,Object>>()); - - public AbstractMultipleSensorAggregator() {} - - protected abstract Collection<Sensor<?>> getSourceSensors(); - - @Override - protected void setEntityLoadingConfig() { - super.setEntityLoadingConfig(); - Preconditions.checkNotNull(getSourceSensors(), "sourceSensors must be set"); - } - - @Override - protected void setEntityBeforeSubscribingProducerChildrenEvents() { - BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer), - "{} subscribing to children of {}", this, producer); - for (Sensor<?> sourceSensor: getSourceSensors()) { - subscribeToChildren(producer, sourceSensor, this); - } - } - - @Override - protected void addProducerHardcoded(Entity producer) { - for (Sensor<?> sourceSensor: getSourceSensors()) { - subscribe(producer, sourceSensor, this); - } - onProducerAdded(producer); - } - - @Override - protected void addProducerChild(Entity producer) { - // no `subscribe` call needed here, due to previous subscribeToChildren call - onProducerAdded(producer); - } - - @Override - protected void addProducerMember(Entity producer) { - addProducerHardcoded(producer); - } - - @Override - protected void onProducerAdded(Entity producer) { - BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer), - "{} listening to {}", this, producer); - synchronized (values) { - for (Sensor<?> sensor: getSourceSensors()) { - Map<Entity,Object> vs = values.get(sensor.getName()); - if (vs==null) { - vs = new LinkedHashMap<Entity,Object>(); - values.put(sensor.getName(), vs); - } - - Object vo = vs.get(producer); - if (vo==null) { - Object initialVal; - if (sensor instanceof AttributeSensor) { - initialVal = producer.getAttribute((AttributeSensor<?>)sensor); - } else { - initialVal = null; - } - vs.put(producer, initialVal != null ? initialVal : defaultMemberValue); - // NB: see notes on possible race, in Aggregator#onProducerAdded - } - - } - } - } - - @Override - protected void onProducerRemoved(Entity producer) { - synchronized (values) { - for (Sensor<?> sensor: getSourceSensors()) { - Map<Entity,Object> vs = values.get(sensor.getName()); - if (vs!=null) - vs.remove(producer); - } - } - onUpdated(); - } - - @Override - public void onEvent(SensorEvent<Object> event) { - Entity e = event.getSource(); - synchronized (values) { - Map<Entity,Object> vs = values.get(event.getSensor().getName()); - if (vs==null) { - LOG.debug(this+" received event when no entry for sensor ("+event+"); likely just added or removed, and will initialize subsequently if needed"); - } else { - vs.put(e, event.getValue()); - } - } - onUpdated(); - } - - public <T> Map<Entity,T> getValues(Sensor<T> sensor) { - Map<Entity, T> valuesCopy = copyValues(sensor); - return coerceValues(valuesCopy, sensor.getType()); - } - - private <T> Map<Entity, T> coerceValues(Map<Entity, T> values, Class<? super T> type) { - Map<Entity, T> typedValues = MutableMap.of(); - for (Entry<Entity, T> entry : values.entrySet()) { - @SuppressWarnings("unchecked") - T typedValue = (T) TypeCoercions.coerce(entry.getValue(), type); - typedValues.put(entry.getKey(), typedValue); - } - return typedValues; - } - - private <T> Map<Entity, T> copyValues(Sensor<T> sensor) { - synchronized (values) { - @SuppressWarnings("unchecked") - Map<Entity, T> sv = (Map<Entity, T>) values.get(sensor.getName()); - //use MutableMap because of potentially null values - return MutableMap.copyOf(sv).asUnmodifiable(); - } - } - - @Override - protected abstract Object compute(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTransformer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTransformer.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTransformer.java deleted file mode 100644 index 1ff7938..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTransformer.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.sensor.BasicSensorEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.reflect.TypeToken; - -@SuppressWarnings("serial") -public abstract class AbstractTransformer<T,U> extends AbstractEnricher implements SensorEventListener<T> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractTransformer.class); - - public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer"); - - public static ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); - - public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor"); - - protected Entity producer; - protected Sensor<T> sourceSensor; - protected Sensor<U> targetSensor; - - public AbstractTransformer() { - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - - Function<SensorEvent<T>, U> transformation = getTransformation(); - this.producer = getConfig(PRODUCER) == null ? entity: getConfig(PRODUCER); - this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR); - Sensor<?> targetSensorSpecified = getConfig(TARGET_SENSOR); - this.targetSensor = targetSensorSpecified!=null ? (Sensor<U>) targetSensorSpecified : (Sensor<U>) this.sourceSensor; - if (producer.equals(entity) && targetSensorSpecified==null) { - LOG.error("Refusing to add an enricher which reads and publishes on the same sensor: "+ - producer+"."+sourceSensor+" (computing "+transformation+")"); - // we don't throw because this error may manifest itself after a lengthy deployment, - // and failing it at that point simply because of an enricher is not very pleasant - // (at least not until we have good re-run support across the board) - return; - } - - subscribe(producer, sourceSensor, this); - - if (sourceSensor instanceof AttributeSensor) { - Object value = producer.getAttribute((AttributeSensor<?>)sourceSensor); - // TODO would be useful to have a convenience to "subscribeAndThenIfItIsAlreadySetRunItOnce" - if (value!=null) { - onEvent(new BasicSensorEvent(sourceSensor, producer, value, -1)); - } - } - } - - /** returns a function for transformation, for immediate use only (not for caching, as it may change) */ - protected abstract Function<SensorEvent<T>, U> getTransformation(); - - @Override - public void onEvent(SensorEvent<T> event) { - emit(targetSensor, compute(event)); - } - - protected Object compute(SensorEvent<T> event) { - // transformation is not going to change, but this design makes it easier to support changing config in future. - // if it's an efficiency hole we can switch to populate the transformation at start. - U result = getTransformation().apply(event); - if (LOG.isTraceEnabled()) - LOG.trace("Enricher "+this+" computed "+result+" from "+event); - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTransformingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTransformingEnricher.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTransformingEnricher.java deleted file mode 100644 index 4fea37a..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTransformingEnricher.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.sensor.Sensor; - -/** - * Convenience base for transforming a single sensor into a single new sensor of the same type - * - * @deprecated since 0.7.0; use {@link Enrichers.builder()} - */ -public abstract class AbstractTransformingEnricher<T> extends AbstractTypeTransformingEnricher<T,T> { - - public AbstractTransformingEnricher() { // for rebinding - } - - public AbstractTransformingEnricher(Entity producer, Sensor<T> source, Sensor<T> target) { - super(producer, source, target); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTypeTransformingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTypeTransformingEnricher.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTypeTransformingEnricher.java deleted file mode 100644 index f66004c..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AbstractTypeTransformingEnricher.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.core.sensor.BasicSensorEvent; -import org.apache.brooklyn.util.core.flags.SetFromFlag; - -/** - * Convenience base for transforming a single sensor into a single new sensor of the same type - * - * @deprecated since 0.7.0; use {@link Enrichers.builder()} - */ -public abstract class AbstractTypeTransformingEnricher<T,U> extends AbstractEnricher implements SensorEventListener<T> { - - @SetFromFlag - private Entity producer; - - @SetFromFlag - private Sensor<T> source; - - @SetFromFlag - protected Sensor<U> target; - - public AbstractTypeTransformingEnricher() { // for rebind - } - - public AbstractTypeTransformingEnricher(Entity producer, Sensor<T> source, Sensor<U> target) { - this.producer = producer; - this.source = source; - this.target = target; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - if (producer==null) producer = entity; - subscribe(producer, source, this); - - if (source instanceof AttributeSensor) { - Object value = producer.getAttribute((AttributeSensor)source); - // TODO Aled didn't you write a convenience to "subscribeAndRunIfSet" ? (-Alex) - if (value!=null) - onEvent(new BasicSensorEvent(source, producer, value, -1)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/AddingEnricher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AddingEnricher.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/AddingEnricher.java deleted file mode 100644 index 997f974..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/AddingEnricher.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.core.sensor.BasicSensorEvent; - -/** - * enricher which adds multiple sensors on an entity to produce a new sensor - * - * Instead, consider calling: - * <pre> - * {@code - * addEnricher(Enrichers.builder() - * .combining(sources) - * .publishing(target) - * .computeSum() - * .build()); - * } - * </pre> - * <p> - * - * @deprecated since 0.7.0; use {@link Enrichers.builder()} - * @see Combiner if need to sub-class - */ -public class AddingEnricher extends AbstractEnricher implements SensorEventListener { - - private Sensor[] sources; - private Sensor<? extends Number> target; - - public AddingEnricher(Sensor sources[], Sensor<? extends Number> target) { - this.sources = sources; - this.target = target; - } - - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - - for (Sensor source: sources) { - subscribe(entity, source, this); - if (source instanceof AttributeSensor) { - Object value = entity.getAttribute((AttributeSensor)source); - if (value!=null) - onEvent(new BasicSensorEvent(source, entity, value, -1)); - } - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public void onEvent(SensorEvent event) { - Number value = recompute(); - Number typedValue = cast(value, (Class<? extends Number>)target.getType()); - if (target instanceof AttributeSensor) { - entity.setAttribute((AttributeSensor)target, typedValue); - } else if (typedValue!=null) - entity.emit((Sensor)target, typedValue); - } - - @SuppressWarnings("unchecked") - public static <V> V cast(Number value, Class<V> type) { - if (value==null) return null; - if (type.isInstance(value)) return (V)value; - - if (type==Integer.class) return (V) (Integer) (int)Math.round(value.doubleValue()); - if (type==Long.class) return (V) (Long) Math.round(value.doubleValue()); - if (type==Double.class) return (V) (Double) value.doubleValue(); - if (type==Float.class) return (V) (Float) value.floatValue(); - if (type==Byte.class) return (V) (Byte) (byte)Math.round(value.doubleValue()); - if (type==Short.class) return (V) (Short) (short)Math.round(value.doubleValue()); - - throw new UnsupportedOperationException("conversion of mathematical operation to "+type+" not supported"); - } - - protected Number recompute() { - if (sources.length==0) return null; - Double result = 0d; - for (Sensor source: sources) { - Object value = entity.getAttribute((AttributeSensor) source); - if (value==null) return null; - result += ((Number)value).doubleValue(); - } - return result; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f15e8a6/core/src/main/java/org/apache/brooklyn/sensor/enricher/Aggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/enricher/Aggregator.java b/core/src/main/java/org/apache/brooklyn/sensor/enricher/Aggregator.java deleted file mode 100644 index af828bc..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/enricher/Aggregator.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.enricher; - -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.BrooklynLogging; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.flags.SetFromFlag; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.text.StringPredicates; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import com.google.common.reflect.TypeToken; - -/** Building on {@link AbstractAggregator} for a single source sensor (on multiple children and/or members) */ -@SuppressWarnings("serial") -//@Catalog(name="Aggregator", description="Aggregates attributes from multiple entities into a single attribute value; see Enrichers.builder().aggregating(...)") -public class Aggregator<T,U> extends AbstractAggregator<T,U> implements SensorEventListener<T> { - - private static final Logger LOG = LoggerFactory.getLogger(Aggregator.class); - - public static final ConfigKey<Sensor<?>> SOURCE_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.sourceSensor"); - - @SetFromFlag("transformation") - public static final ConfigKey<Object> TRANSFORMATION_UNTYPED = ConfigKeys.newConfigKey(Object.class, "enricher.transformation.untyped", - "Specifies a transformation, as a function from a collection to the value, or as a string matching a pre-defined named transformation, " - + "such as 'average' (for numbers), 'sum' (for numbers), or 'list' (the default, putting any collection of items into a list)"); - public static final ConfigKey<Function<? super Collection<?>, ?>> TRANSFORMATION = ConfigKeys.newConfigKey(new TypeToken<Function<? super Collection<?>, ?>>() {}, "enricher.transformation"); - - public static final ConfigKey<Boolean> EXCLUDE_BLANK = ConfigKeys.newBooleanConfigKey("enricher.aggregator.excludeBlank", "Whether explicit nulls or blank strings should be excluded (default false); this only applies if no value filter set", false); - - protected Sensor<T> sourceSensor; - protected Function<? super Collection<T>, ? extends U> transformation; - - /** - * Users of values should either on it synchronize when iterating over its entries or use - * copyOfValues to obtain an immutable copy of the map. - */ - // We use a synchronizedMap over a ConcurrentHashMap for entities that store null values. - protected final Map<Entity, T> values = Collections.synchronizedMap(new LinkedHashMap<Entity, T>()); - - public Aggregator() {} - - @SuppressWarnings("unchecked") - protected void setEntityLoadingConfig() { - super.setEntityLoadingConfig(); - this.sourceSensor = (Sensor<T>) getRequiredConfig(SOURCE_SENSOR); - - this.transformation = (Function<? super Collection<T>, ? extends U>) config().get(TRANSFORMATION); - - Object t1 = config().get(TRANSFORMATION_UNTYPED); - Function<? super Collection<?>, ?> t2 = null; - if (t1 instanceof String) { - t2 = lookupTransformation((String)t1); - if (t2==null) { - LOG.warn("Unknown transformation '"+t1+"' for "+this+"; will use default transformation"); - } - } - - if (this.transformation==null) { - this.transformation = (Function<? super Collection<T>, ? extends U>) t2; - } else if (t1!=null && !Objects.equals(t2, this.transformation)) { - throw new IllegalStateException("Cannot supply both "+TRANSFORMATION_UNTYPED+" and "+TRANSFORMATION+" unless they are equal."); - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected Function<? super Collection<?>, ?> lookupTransformation(String t1) { - if ("average".equalsIgnoreCase(t1)) return new Enrichers.ComputingAverage(null, null, targetSensor.getTypeToken()); - if ("sum".equalsIgnoreCase(t1)) return new Enrichers.ComputingAverage(null, null, targetSensor.getTypeToken()); - if ("list".equalsIgnoreCase(t1)) return new ComputingList(); - return null; - } - - private class ComputingList<TT> implements Function<Collection<TT>, List<TT>> { - @Override - public List<TT> apply(Collection<TT> input) { - if (input==null) return null; - return MutableList.copyOf(input).asUnmodifiable(); - } - - } - - @Override - protected void setEntityBeforeSubscribingProducerChildrenEvents() { - BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer), - "{} subscribing to children of {}", this, producer); - subscribeToChildren(producer, sourceSensor, this); - } - - @Override - protected void addProducerHardcoded(Entity producer) { - subscribe(producer, sourceSensor, this); - onProducerAdded(producer); - } - - @Override - protected void addProducerChild(Entity producer) { - // no subscription needed here, due to the subscribeToChildren call - onProducerAdded(producer); - } - - @Override - protected void addProducerMember(Entity producer) { - subscribe(producer, sourceSensor, this); - onProducerAdded(producer); - } - - @Override - protected void onProducerAdded(Entity producer) { - BrooklynLogging.log(LOG, BrooklynLogging.levelDebugOrTraceIfReadOnly(producer), - "{} listening to {}", this, producer); - synchronized (values) { - T vo = values.get(producer); - if (vo==null) { - T initialVal; - if (sourceSensor instanceof AttributeSensor) { - initialVal = producer.getAttribute((AttributeSensor<T>)sourceSensor); - } else { - initialVal = null; - } - values.put(producer, initialVal != null ? initialVal : defaultMemberValue); - //we might skip in onEvent in the short window while !values.containsKey(producer) - //but that's okay because the put which would have been done there is done here now - } else { - //vo will be null unless some weird race with addProducer+removeProducer is occuring - //(and that's something we can tolerate i think) - if (LOG.isDebugEnabled()) LOG.debug("{} already had value ({}) for producer ({}); but that producer has just been added", new Object[] {this, vo, producer}); - } - } - } - - @Override - protected Predicate<?> getDefaultValueFilter() { - if (getConfig(EXCLUDE_BLANK)) - return StringPredicates.isNonBlank(); - else - return Predicates.alwaysTrue(); - } - - @Override - protected void onProducerRemoved(Entity producer) { - values.remove(producer); - onUpdated(); - } - - @Override - public void onEvent(SensorEvent<T> event) { - Entity e = event.getSource(); - synchronized (values) { - if (values.containsKey(e)) { - values.put(e, event.getValue()); - } else { - if (LOG.isDebugEnabled()) LOG.debug("{} received event for unknown producer ({}); presumably that producer has recently been removed", this, e); - } - } - onUpdated(); - } - - protected void onUpdated() { - try { - emit(targetSensor, compute()); - } catch (Throwable t) { - LOG.warn("Error calculating and setting aggregate for enricher "+this, t); - throw Exceptions.propagate(t); - } - } - - @Override - protected Object compute() { - synchronized (values) { - // TODO Could avoid copying when filter not needed - List<T> vs = MutableList.copyOf(Iterables.filter(values.values(), valueFilter)); - if (transformation==null) return vs; - return transformation.apply(vs); - } - } - - protected Map<Entity, T> copyOfValues() { - // Don't use ImmutableMap, as can contain null values - synchronized (values) { - return Collections.unmodifiableMap(MutableMap.copyOf(values)); - } - } - -}
