http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java ---------------------------------------------------------------------- diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java index 0000000,6c251a7..ac4bef5 mode 000000,100644..100644 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java +++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java @@@ -1,0 -1,917 +1,934 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.brooklyn.core.sensor; + + import static com.google.common.base.Preconditions.checkNotNull; + import groovy.lang.Closure; + + import java.util.Arrays; + import java.util.Collection; + import java.util.Iterator; + import java.util.LinkedList; + import java.util.List; + import java.util.Map; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.Semaphore; + import java.util.concurrent.TimeUnit; + + import javax.annotation.Nullable; + + import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.EntityLocal; + import org.apache.brooklyn.api.mgmt.ExecutionContext; + import org.apache.brooklyn.api.mgmt.SubscriptionHandle; + import org.apache.brooklyn.api.mgmt.Task; + import org.apache.brooklyn.api.mgmt.TaskAdaptable; + import org.apache.brooklyn.api.mgmt.TaskFactory; + import org.apache.brooklyn.api.sensor.AttributeSensor; + import org.apache.brooklyn.api.sensor.SensorEvent; + import org.apache.brooklyn.api.sensor.SensorEventListener; + import org.apache.brooklyn.config.ConfigKey; + import org.apache.brooklyn.core.entity.Attributes; + import org.apache.brooklyn.core.entity.Entities; + import org.apache.brooklyn.core.entity.EntityInternal; + import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; + import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; + import org.apache.brooklyn.util.collections.CollectionFunctionals; + import org.apache.brooklyn.util.collections.MutableList; + import org.apache.brooklyn.util.collections.MutableMap; + import org.apache.brooklyn.util.core.task.BasicExecutionContext; + import org.apache.brooklyn.util.core.task.BasicTask; + import org.apache.brooklyn.util.core.task.DeferredSupplier; + import org.apache.brooklyn.util.core.task.DynamicTasks; + import org.apache.brooklyn.util.core.task.ParallelTask; + import org.apache.brooklyn.util.core.task.TaskInternal; + import org.apache.brooklyn.util.core.task.Tasks; + import org.apache.brooklyn.util.core.task.ValueResolver; + import org.apache.brooklyn.util.exceptions.CompoundRuntimeException; + import org.apache.brooklyn.util.exceptions.Exceptions; + import org.apache.brooklyn.util.exceptions.NotManagedException; + import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException; + import org.apache.brooklyn.util.groovy.GroovyJavaMethods; + import org.apache.brooklyn.util.guava.Functionals; + import org.apache.brooklyn.util.guava.Maybe; + import org.apache.brooklyn.util.text.StringFunctions; + import org.apache.brooklyn.util.text.Strings; + import org.apache.brooklyn.util.time.CountdownTimer; + import org.apache.brooklyn.util.time.Duration; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.google.common.annotations.Beta; + import com.google.common.base.Function; + import com.google.common.base.Functions; + import com.google.common.base.Predicate; + import com.google.common.base.Predicates; + import com.google.common.base.Throwables; + import com.google.common.collect.ImmutableList; + import com.google.common.collect.Iterables; + import com.google.common.collect.Lists; + + /** Conveniences for making tasks which run in entity {@link ExecutionContext}s, subscribing to attributes from other entities, possibly transforming those; + * these {@link Task} instances are typically passed in {@link EntityLocal#setConfig(ConfigKey, Object)}. + * <p> + * If using a lot it may be useful to: + * <pre> + * {@code + * import static org.apache.brooklyn.core.sensor.DependentConfiguration.*; + * } + * </pre> + */ + public class DependentConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(DependentConfiguration.class); + + //not instantiable, only a static helper + private DependentConfiguration() {} + + /** + * Default readiness is Groovy truth. + * + * @see #attributeWhenReady(Entity, AttributeSensor, Predicate) + */ + public static <T> Task<T> attributeWhenReady(Entity source, AttributeSensor<T> sensor) { + return attributeWhenReady(source, sensor, GroovyJavaMethods.truthPredicate()); + } + + public static <T> Task<T> attributeWhenReady(Entity source, AttributeSensor<T> sensor, Closure<Boolean> ready) { + Predicate<Object> readyPredicate = (ready != null) ? GroovyJavaMethods.<Object>predicateFromClosure(ready) : GroovyJavaMethods.truthPredicate(); + return attributeWhenReady(source, sensor, readyPredicate); + } + + /** returns an unsubmitted {@link Task} which blocks until the given sensor on the given source entity gives a value that satisfies ready, then returns that value; + * particular useful in Entity configuration where config will block until Tasks have a value + */ + public static <T> Task<T> attributeWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready) { + Builder<T, T> builder = builder().attributeWhenReady(source, sensor); + if (ready != null) builder.readiness(ready); + return builder.build(); + + } + + public static <T,V> Task<V> attributePostProcessedWhenReady(Entity source, AttributeSensor<T> sensor, Closure<Boolean> ready, Closure<V> postProcess) { + Predicate<? super T> readyPredicate = (ready != null) ? GroovyJavaMethods.predicateFromClosure(ready) : GroovyJavaMethods.truthPredicate(); + Function<? super T, V> postProcessFunction = GroovyJavaMethods.<T,V>functionFromClosure(postProcess); + return attributePostProcessedWhenReady(source, sensor, readyPredicate, postProcessFunction); + } + + public static <T,V> Task<V> attributePostProcessedWhenReady(Entity source, AttributeSensor<T> sensor, Closure<V> postProcess) { + return attributePostProcessedWhenReady(source, sensor, GroovyJavaMethods.truthPredicate(), GroovyJavaMethods.<T,V>functionFromClosure(postProcess)); + } + + public static <T> Task<T> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, T value) { + return DependentConfiguration.<T,T>attributePostProcessedWhenReady(source, sensor, GroovyJavaMethods.truthPredicate(), Functions.constant(value)); + } + + public static <T,V> Task<V> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, Function<? super T,V> valueProvider) { + return attributePostProcessedWhenReady(source, sensor, GroovyJavaMethods.truthPredicate(), valueProvider); + } + + public static <T,V> Task<V> valueWhenAttributeReady(Entity source, AttributeSensor<T> sensor, Closure<V> valueProvider) { + return attributePostProcessedWhenReady(source, sensor, GroovyJavaMethods.truthPredicate(), valueProvider); + } + + public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Closure<V> postProcess) { + return attributePostProcessedWhenReady(source, sensor, ready, GroovyJavaMethods.<T,V>functionFromClosure(postProcess)); + } + + @SuppressWarnings("unchecked") + public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity source, final AttributeSensor<T> sensor, final Predicate<? super T> ready, final Function<? super T,V> postProcess) { + Builder<T,T> builder1 = DependentConfiguration.builder().attributeWhenReady(source, sensor); + // messy generics here to support null postProcess; would be nice to disallow that here + Builder<T,V> builder; + if (postProcess != null) { + builder = builder1.postProcess(postProcess); + } else { + builder = (Builder<T,V>)builder1; + } + if (ready != null) builder.readiness(ready); + + return builder.build(); + } + + public static <T> T waitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready) { + return waitInTaskForAttributeReady(source, sensor, ready, ImmutableList.<AttributeAndSensorCondition<?>>of()); + } + + public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions) { + String blockingDetails = "Waiting for ready from "+source+" "+sensor+" (subscription)"; + return waitInTaskForAttributeReady(source, sensor, ready, abortConditions, blockingDetails); + } + + // TODO would be nice to have an easy semantics for whenServiceUp (cf DynamicWebAppClusterImpl.whenServiceUp) + + public static <T> T waitInTaskForAttributeReady(final Entity source, final AttributeSensor<T> sensor, Predicate<? super T> ready, List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) { + return new WaitInTaskForAttributeReady<T,T>(source, sensor, ready, abortConditions, blockingDetails).call(); + } + + protected static class WaitInTaskForAttributeReady<T,V> implements Callable<V> { + + /* This is a change since before Oct 2014. Previously it would continue to poll, + * (maybe finding a different error) if the target entity becomes unmanaged. + * Now it actively checks unmanaged by default, and still throws although it might + * now find a different problem. */ + private final static boolean DEFAULT_IGNORE_UNMANAGED = false; + + protected final Entity source; + protected final AttributeSensor<T> sensor; + protected final Predicate<? super T> ready; + protected final List<AttributeAndSensorCondition<?>> abortSensorConditions; + protected final String blockingDetails; + protected final Function<? super T,? extends V> postProcess; + protected final Duration timeout; + protected final Maybe<V> onTimeout; + protected final boolean ignoreUnmanaged; + protected final Maybe<V> onUnmanaged; + // TODO onError Continue / Throw / Return(V) + + protected WaitInTaskForAttributeReady(Builder<T, V> builder) { + this.source = builder.source; + this.sensor = builder.sensor; + this.ready = builder.readiness; + this.abortSensorConditions = builder.abortSensorConditions; + this.blockingDetails = builder.blockingDetails; + this.postProcess = builder.postProcess; + this.timeout = builder.timeout; + this.onTimeout = builder.onTimeout; + this.ignoreUnmanaged = builder.ignoreUnmanaged; + this.onUnmanaged = builder.onUnmanaged; + } + + private WaitInTaskForAttributeReady(Entity source, AttributeSensor<T> sensor, Predicate<? super T> ready, + List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) { + this.source = source; + this.sensor = sensor; + this.ready = ready; + this.abortSensorConditions = abortConditions; + this.blockingDetails = blockingDetails; + + this.timeout = Duration.PRACTICALLY_FOREVER; + this.onTimeout = Maybe.absent(); + this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED; + this.onUnmanaged = Maybe.absent(); + this.postProcess = null; + } + + @SuppressWarnings("unchecked") + protected V postProcess(T value) { + if (this.postProcess!=null) return postProcess.apply(value); + // if no post-processing assume the types are correct + return (V) value; + } + + protected boolean ready(T value) { + if (ready!=null) return ready.apply(value); + return GroovyJavaMethods.truth(value); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public V call() { + T value = source.getAttribute(sensor); + + // return immediately if either the ready predicate or the abort conditions hold + if (ready(value)) return postProcess(value); + + final List<Exception> abortionExceptions = Lists.newCopyOnWriteArrayList(); + long start = System.currentTimeMillis(); + + for (AttributeAndSensorCondition abortCondition : abortSensorConditions) { + Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor); + if (abortCondition.predicate.apply(abortValue)) { + abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); + } + } + if (abortionExceptions.size() > 0) { + throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); + } + + TaskInternal<?> current = (TaskInternal<?>) Tasks.current(); + if (current == null) throw new IllegalStateException("Should only be invoked in a running task"); + Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current); + if (entity == null) throw new IllegalStateException("Should only be invoked in a running task with an entity tag; "+ + current+" has no entity tag ("+current.getStatusDetail(false)+")"); + + final LinkedList<T> publishedValues = new LinkedList<T>(); + final Semaphore semaphore = new Semaphore(0); // could use Exchanger + SubscriptionHandle subscription = null; + List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList(); + + try { + subscription = entity.subscriptions().subscribe(source, sensor, new SensorEventListener<T>() { + @Override public void onEvent(SensorEvent<T> event) { + synchronized (publishedValues) { publishedValues.add(event.getValue()); } + semaphore.release(); + }}); + for (final AttributeAndSensorCondition abortCondition : abortSensorConditions) { + abortSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source, abortCondition.sensor, new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + if (abortCondition.predicate.apply(event.getValue())) { + abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); + semaphore.release(); + } + }})); + Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor); + if (abortCondition.predicate.apply(abortValue)) { + abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+" -> "+abortCondition.sensor)); + } + } + if (abortionExceptions.size() > 0) { + throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); + } + + CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null; + Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT; + Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD; + while (true) { + // check the source on initial run (could be done outside the loop) + // and also (optionally) on each iteration in case it is more recent + value = source.getAttribute(sensor); + if (ready(value)) break; + + if (timer!=null) { + if (timer.getDurationRemaining().isShorterThan(nextPeriod)) { + nextPeriod = timer.getDurationRemaining(); + } + if (timer.isExpired()) { + if (onTimeout.isPresent()) return onTimeout.get(); + throw new RuntimeTimeoutException("Unsatisfied after "+Duration.sinceUtc(start)); + } + } + + String prevBlockingDetails = current.setBlockingDetails(blockingDetails); + try { + if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS)) { + // immediately release so we are available for the next check + semaphore.release(); + // if other permits have been made available (e.g. multiple notifications) drain them all as no point running multiple times + semaphore.drainPermits(); + } + } finally { + current.setBlockingDetails(prevBlockingDetails); + } + + // check any subscribed values which have come in first + while (true) { + synchronized (publishedValues) { + if (publishedValues.isEmpty()) break; + value = publishedValues.pop(); + } + if (ready(value)) break; + } + + // if unmanaged then ignore the other abort conditions + if (!ignoreUnmanaged && Entities.isNoLongerManaged(entity)) { + if (onUnmanaged.isPresent()) return onUnmanaged.get(); + throw new NotManagedException(entity); + } + + if (abortionExceptions.size() > 0) { + throw new CompoundRuntimeException("Aborted waiting for ready from "+source+" "+sensor, abortionExceptions); + } + + nextPeriod = nextPeriod.times(2).upperBound(maxPeriod); + } + if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}", sensor, source); + return postProcess(value); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } finally { + if (subscription != null) { + entity.subscriptions().unsubscribe(subscription); + } + for (SubscriptionHandle handle : abortSubscriptions) { + entity.subscriptions().unsubscribe(handle); + } + } + } + } + + /** + * Returns a {@link Task} which blocks until the given job returns, then returns the value of that job. + * + * @deprecated since 0.7; code will be moved into test utilities + */ + @Deprecated + public static <T> Task<T> whenDone(Callable<T> job) { + return new BasicTask<T>(MutableMap.of("tag", "whenDone", "displayName", "waiting for job"), job); + } + + /** + * Returns a {@link Task} which waits for the result of first parameter, then applies the function in the second + * parameter to it, returning that result. + * + * Particular useful in Entity configuration where config will block until Tasks have completed, + * allowing for example an {@link #attributeWhenReady(Entity, AttributeSensor, Predicate)} expression to be + * passed in the first argument then transformed by the function in the second argument to generate + * the value that is used for the configuration + */ + public static <U,T> Task<T> transform(final Task<U> task, final Function<U,T> transformer) { + return transform(MutableMap.of("displayName", "transforming "+task), task, transformer); + } + + /** @see #transform(Task, Function) */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static <U,T> Task<T> transform(Task<U> task, Closure transformer) { + return transform(task, GroovyJavaMethods.functionFromClosure(transformer)); + } + + /** @see #transform(Task, Function) */ + @SuppressWarnings({ "rawtypes" }) + public static <U,T> Task<T> transform(final Map flags, final TaskAdaptable<U> task, final Function<U,T> transformer) { + return new BasicTask<T>(flags, new Callable<T>() { + public T call() throws Exception { + if (!task.asTask().isSubmitted()) { + BasicExecutionContext.getCurrentExecutionContext().submit(task); + } + return transformer.apply(task.asTask().get()); + }}); + } + + /** Returns a task which waits for multiple other tasks (submitting if necessary) + * and performs arbitrary computation over the List of results. + * @see #transform(Task, Function) but note argument order is reversed (counterintuitive) to allow for varargs */ + public static <U,T> Task<T> transformMultiple(Function<List<U>,T> transformer, @SuppressWarnings("unchecked") TaskAdaptable<U> ...tasks) { + return transformMultiple(MutableMap.of("displayName", "transforming multiple"), transformer, tasks); + } + + /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static <U,T> Task<T> transformMultiple(Closure transformer, TaskAdaptable<U> ...tasks) { + return transformMultiple(GroovyJavaMethods.functionFromClosure(transformer), tasks); + } + + /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static <U,T> Task<T> transformMultiple(Map flags, Closure transformer, TaskAdaptable<U> ...tasks) { + return transformMultiple(flags, GroovyJavaMethods.functionFromClosure(transformer), tasks); + } + + /** @see #transformMultiple(Function, TaskAdaptable...) */ + @SuppressWarnings({ "rawtypes" }) + public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, @SuppressWarnings("unchecked") TaskAdaptable<U> ...tasks) { + return transformMultiple(flags, transformer, Arrays.asList(tasks)); + } + @SuppressWarnings({ "rawtypes" }) + public static <U,T> Task<T> transformMultiple(Map flags, final Function<List<U>,T> transformer, Collection<? extends TaskAdaptable<U>> tasks) { + if (tasks.size()==1) { + return transform(flags, Iterables.getOnlyElement(tasks), new Function<U,T>() { + @Override + @Nullable + public T apply(@Nullable U input) { + return transformer.apply(ImmutableList.of(input)); + } + }); + } + return transform(flags, new ParallelTask<U>(tasks), transformer); + } + + + /** Method which returns a Future containing a string formatted using String.format, + * where the arguments can be normal objects or tasks; + * tasks will be waited on (submitted if necessary) and their results substituted in the call + * to String.format. + * <p> + * Example: + * <pre> + * {@code + * setConfig(URL, DependentConfiguration.formatString("%s:%s", + * DependentConfiguration.attributeWhenReady(target, Target.HOSTNAME), + * DependentConfiguration.attributeWhenReady(target, Target.PORT) ) ); + * } + * </pre> + */ + @SuppressWarnings("unchecked") + public static Task<String> formatString(final String spec, final Object ...args) { + List<TaskAdaptable<Object>> taskArgs = Lists.newArrayList(); + for (Object arg: args) { + if (arg instanceof TaskAdaptable) taskArgs.add((TaskAdaptable<Object>)arg); + else if (arg instanceof TaskFactory) taskArgs.add( ((TaskFactory<TaskAdaptable<Object>>)arg).newTask() ); + } + + return transformMultiple( + MutableMap.<String,String>of("displayName", "formatting '"+spec+"' with "+taskArgs.size()+" task"+(taskArgs.size()!=1?"s":"")), + new Function<List<Object>, String>() { + @Override public String apply(List<Object> input) { + Iterator<?> tri = input.iterator(); + Object[] vv = new Object[args.length]; + int i=0; + for (Object arg : args) { + if (arg instanceof TaskAdaptable || arg instanceof TaskFactory) vv[i] = tri.next(); + else if (arg instanceof DeferredSupplier) vv[i] = ((DeferredSupplier<?>) arg).get(); + else vv[i] = arg; + i++; + } + return String.format(spec, vv); + }}, + taskArgs); + } + + public static Task<String> regexReplacement(Object source, Object pattern, Object replacement) { + List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(source, pattern, replacement); + Function<List<Object>, String> transformer = new RegexTransformerString(source, pattern, replacement); + return transformMultiple( + MutableMap.of("displayName", String.format("creating regex replacement function (%s:%s)", pattern, replacement)), + transformer, + taskArgs + ); + } + + public static Task<Function<String, String>> regexReplacement(Object pattern, Object replacement) { + List<TaskAdaptable<Object>> taskArgs = getTaskAdaptable(pattern, replacement); + Function<List<Object>, Function<String, String>> transformer = new RegexTransformerFunction(pattern, replacement); + return transformMultiple( + MutableMap.of("displayName", String.format("creating regex replacement function (%s:%s)", pattern, replacement)), + transformer, + taskArgs + ); + } + ++ @SuppressWarnings("unchecked") + private static List<TaskAdaptable<Object>> getTaskAdaptable(Object... args){ + List<TaskAdaptable<Object>> taskArgs = Lists.newArrayList(); + for (Object arg: args) { + if (arg instanceof TaskAdaptable) { + taskArgs.add((TaskAdaptable<Object>)arg); + } else if (arg instanceof TaskFactory) { + taskArgs.add(((TaskFactory<TaskAdaptable<Object>>)arg).newTask()); + } + } + return taskArgs; + } + + public static class RegexTransformerString implements Function<List<Object>, String> { + + private final Object source; + private final Object pattern; + private final Object replacement; + + public RegexTransformerString(Object source, Object pattern, Object replacement){ + this.source = source; + this.pattern = pattern; + this.replacement = replacement; + } + + @Nullable + @Override + public String apply(@Nullable List<Object> input) { + Iterator<?> taskArgsIterator = input.iterator(); + String resolvedSource = resolveArgument(source, taskArgsIterator); + String resolvedPattern = resolveArgument(pattern, taskArgsIterator); + String resolvedReplacement = resolveArgument(replacement, taskArgsIterator); + return new StringFunctions.RegexReplacer(resolvedPattern, resolvedReplacement).apply(resolvedSource); + } + } + + @Beta + public static class RegexTransformerFunction implements Function<List<Object>, Function<String, String>> { + + private final Object pattern; + private final Object replacement; + + public RegexTransformerFunction(Object pattern, Object replacement){ + this.pattern = pattern; + this.replacement = replacement; + } + + @Override + public Function<String, String> apply(List<Object> input) { + Iterator<?> taskArgsIterator = input.iterator(); + return new StringFunctions.RegexReplacer(resolveArgument(pattern, taskArgsIterator), resolveArgument(replacement, taskArgsIterator)); + } + + } + + /** + * Resolves the argument as follows: + * + * If the argument is a DeferredSupplier, we will block and wait for it to resolve. If the argument is TaskAdaptable or TaskFactory, + * we will assume that the resolved task has been queued on the {@code taskArgsIterator}, otherwise the argument has already been resolved. + */ + private static String resolveArgument(Object argument, Iterator<?> taskArgsIterator) { + Object resolvedArgument; + if (argument instanceof TaskAdaptable) { + resolvedArgument = taskArgsIterator.next(); + } else if (argument instanceof DeferredSupplier) { + resolvedArgument = ((DeferredSupplier<?>) argument).get(); + } else { + resolvedArgument = argument; + } + return String.valueOf(resolvedArgument); + } + + + /** returns a task for parallel execution returning a list of values for the given sensor for the given entity list, + * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ + public static <T> Task<List<T>> listAttributesWhenReady(AttributeSensor<T> sensor, Iterable<Entity> entities) { + return listAttributesWhenReady(sensor, entities, GroovyJavaMethods.truthPredicate()); + } + + public static <T> Task<List<T>> listAttributesWhenReady(AttributeSensor<T> sensor, Iterable<Entity> entities, Closure<Boolean> readiness) { + Predicate<Object> readinessPredicate = (readiness != null) ? GroovyJavaMethods.<Object>predicateFromClosure(readiness) : GroovyJavaMethods.truthPredicate(); + return listAttributesWhenReady(sensor, entities, readinessPredicate); + } + + /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, + * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ + public static <T> Task<List<T>> listAttributesWhenReady(final AttributeSensor<T> sensor, Iterable<Entity> entities, Predicate<? super T> readiness) { + if (readiness == null) readiness = GroovyJavaMethods.truthPredicate(); + return builder().attributeWhenReadyFromMultiple(entities, sensor, readiness).build(); + } + + /** @see #waitForTask(Task, Entity, String) */ + public static <T> T waitForTask(Task<T> t, Entity context) throws InterruptedException { + return waitForTask(t, context, null); + } + + /** blocks until the given task completes, submitting if necessary, returning the result of that task; + * optional contextMessage is available in status if this is running in a task + */ + @SuppressWarnings("unchecked") + public static <T> T waitForTask(Task<T> t, Entity context, String contextMessage) throws InterruptedException { + try { + return (T) Tasks.resolveValue(t, Object.class, ((EntityInternal)context).getExecutionContext(), contextMessage); + } catch (ExecutionException e) { + throw Throwables.propagate(e); + } + } + + public static class AttributeAndSensorCondition<T> { + protected final Entity source; + protected final AttributeSensor<T> sensor; + protected final Predicate<? super T> predicate; + + public AttributeAndSensorCondition(Entity source, AttributeSensor<T> sensor, Predicate<? super T> predicate) { + this.source = checkNotNull(source, "source"); + this.sensor = checkNotNull(sensor, "sensor"); + this.predicate = checkNotNull(predicate, "predicate"); + } + } + + public static ProtoBuilder builder() { + return new ProtoBuilder(); + } + + /** + * Builder for producing variants of attributeWhenReady. + */ + @Beta + public static class ProtoBuilder { + /** - * Will wait for the attribute on the given entity. - * If that entity reports {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort. ++ * Will wait for the attribute on the given entity, with default behaviour: ++ * If that entity reports {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE} then it will abort; ++ * If that entity is stopping or destroyed (see {@link Builder#timeoutIfStoppingOrDestroyed(Duration)}), ++ * then it will timeout after 1 minute. + */ + public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) { - return new Builder<T2,T2>(source, sensor).abortIfOnFire(); ++ return new Builder<T2,T2>(source, sensor).abortIfOnFire().timeoutIfStoppingOrDestroyed(Duration.ONE_MINUTE); + } + + /** + * Will wait for the attribute on the given entity, not aborting when it goes {@link Lifecycle#ON_FIRE}. + */ + public <T2> Builder<T2,T2> attributeWhenReadyAllowingOnFire(Entity source, AttributeSensor<T2> sensor) { + return new Builder<T2,T2>(source, sensor); + } + + /** Constructs a builder for task for parallel execution returning a list of values of the given sensor list on the given entity, + * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ + @Beta + public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor) { + return attributeWhenReadyFromMultiple(sources, sensor, GroovyJavaMethods.truthPredicate()); + } + /** As {@link #attributeWhenReadyFromMultiple(Iterable, AttributeSensor)} with an explicit readiness test. */ + @Beta + public <T> MultiBuilder<T, T, List<T>> attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) { + return new MultiBuilder<T, T, List<T>>(sources, sensor, readiness); + } + } + + /** + * Builder for producing variants of attributeWhenReady. + */ + public static class Builder<T,V> { + protected Entity source; + protected AttributeSensor<T> sensor; + protected Predicate<? super T> readiness; + protected Function<? super T, ? extends V> postProcess; + protected List<AttributeAndSensorCondition<?>> abortSensorConditions = Lists.newArrayList(); + protected String blockingDetails; + protected Duration timeout; + protected Maybe<V> onTimeout; + protected boolean ignoreUnmanaged = WaitInTaskForAttributeReady.DEFAULT_IGNORE_UNMANAGED; + protected Maybe<V> onUnmanaged; + + protected Builder(Entity source, AttributeSensor<T> sensor) { + this.source = source; + this.sensor = sensor; + } + + /** + * Will wait for the attribute on the given entity. + * If that entity report {@link Lifecycle#ON_FIRE} for its {@link Attributes#SERVICE_STATE_ACTUAL} then it will abort. + * @deprecated since 0.7.0 use {@link DependentConfiguration#builder()} then {@link ProtoBuilder#attributeWhenReady(Entity, AttributeSensor)} then {@link #abortIfOnFire()} + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <T2> Builder<T2,T2> attributeWhenReady(Entity source, AttributeSensor<T2> sensor) { + this.source = checkNotNull(source, "source"); + this.sensor = (AttributeSensor) checkNotNull(sensor, "sensor"); + abortIfOnFire(); + return (Builder<T2, T2>) this; + } + public Builder<T,V> readiness(Closure<Boolean> val) { + this.readiness = GroovyJavaMethods.predicateFromClosure(checkNotNull(val, "val")); + return this; + } + public Builder<T,V> readiness(Predicate<? super T> val) { + this.readiness = checkNotNull(val, "ready"); + return this; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <V2> Builder<T,V2> postProcess(Closure<V2> val) { + this.postProcess = (Function) GroovyJavaMethods.<T,V2>functionFromClosure(checkNotNull(val, "postProcess")); + return (Builder<T,V2>) this; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public <V2> Builder<T,V2> postProcess(final Function<? super T, V2> val) { + this.postProcess = (Function) checkNotNull(val, "postProcess"); + return (Builder<T,V2>) this; + } + public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor) { + return abortIf(source, sensor, GroovyJavaMethods.truthPredicate()); + } + public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) { + abortSensorConditions.add(new AttributeAndSensorCondition<T2>(source, sensor, predicate)); + return this; + } ++ /** Causes the depender to abort immediately if {@link Attributes#SERVICE_STATE_ACTUAL} ++ * is {@link Lifecycle#ON_FIRE}. */ + public Builder<T,V> abortIfOnFire() { + abortIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.ON_FIRE)); + return this; + } ++ /** Causes the depender to timeout after the given time if {@link Attributes#SERVICE_STATE_ACTUAL} ++ * is one of {@link Lifecycle#STOPPING}, {@link Lifecycle#STOPPED}, or {@link Lifecycle#DESTROYED}. */ ++ public Builder<T,V> timeoutIfStoppingOrDestroyed(Duration time) { ++ timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.STOPPING), time); ++ timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.STOPPED), time); ++ timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL, Predicates.equalTo(Lifecycle.DESTROYED), time); ++ return this; ++ } + public Builder<T,V> blockingDetails(String val) { + blockingDetails = val; + return this; + } + /** specifies an optional timeout; by default it waits forever, or until unmanaged or other abort condition */ + public Builder<T,V> timeout(Duration val) { + timeout = val; + return this; + } ++ /** specifies the supplied timeout if the condition is met */ ++ public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate, Duration val) { ++ if (predicate.apply(source.sensors().get(sensor))) timeout(val); ++ return this; ++ } + public Builder<T,V> onTimeoutReturn(V val) { + onTimeout = Maybe.of(val); + return this; + } + public Builder<T,V> onTimeoutThrow() { + onTimeout = Maybe.<V>absent(); + return this; + } + public Builder<T,V> onUnmanagedReturn(V val) { + onUnmanaged = Maybe.of(val); + return this; + } + public Builder<T,V> onUnmanagedThrow() { + onUnmanaged = Maybe.<V>absent(); + return this; + } + /** @since 0.7.0 included in case old behaviour of not checking whether the entity is managed is required + * (I can't see why it is; polling will likely give errors, once it is unmanaged this will never completed, + * and before management the current code will continue, so long as there are no other errors) */ @Deprecated + public Builder<T,V> onUnmanagedContinue() { + ignoreUnmanaged = true; + return this; + } + /** take advantage of the fact that this builder can build multiple times, allowing subclasses + * to change the source along the way */ + protected Builder<T,V> source(Entity source) { + this.source = source; + return this; + } + /** as {@link #source(Entity)} */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected Builder<T,V> sensor(AttributeSensor<? extends T> sensor) { + this.sensor = (AttributeSensor) sensor; + return this; + } + public Task<V> build() { + validate(); + + return Tasks.<V>builder().dynamic(false) + .displayName("waiting on "+sensor.getName()) + .description("Waiting on sensor "+sensor.getName()+" from "+source) + .tag("attributeWhenReady") + .body(new WaitInTaskForAttributeReady<T,V>(this)) + .build(); + } + + public V runNow() { + validate(); + return new WaitInTaskForAttributeReady<T,V>(this).call(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void validate() { + checkNotNull(source, "Entity source"); + checkNotNull(sensor, "Sensor"); + if (readiness == null) readiness = GroovyJavaMethods.truthPredicate(); + if (postProcess == null) postProcess = (Function) Functions.identity(); + } + } + + /** + * Builder for producing variants of attributeWhenReady. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Beta + public static class MultiBuilder<T, V, V2> { + protected final String name; + protected final String descriptionBase; + protected final Builder<T,V> builder; + // if desired, the use of this multiSource could allow different conditions; + // but probably an easier API just for the caller to build the parallel task + protected final List<AttributeAndSensorCondition<?>> multiSource = Lists.newArrayList(); + protected Function<? super List<V>, V2> postProcessFromMultiple; + + /** returns a task for parallel execution returning a list of values of the given sensor list on the given entity, + * optionally when the values satisfy a given readiness predicate (defaulting to groovy truth if not supplied) */ + @Beta + protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor) { + this(sources, sensor, GroovyJavaMethods.truthPredicate()); + } + @Beta + protected MultiBuilder(Iterable<? extends Entity> sources, AttributeSensor<T> sensor, Predicate<? super T> readiness) { + builder = new Builder<T,V>(null, sensor); + builder.readiness(readiness); + + for (Entity s : checkNotNull(sources, "sources")) { + multiSource.add(new AttributeAndSensorCondition<T>(s, sensor, readiness)); + } + this.name = "waiting on "+sensor.getName(); + this.descriptionBase = "waiting on "+sensor.getName()+" "+readiness + +" from "+Iterables.size(sources)+" entit"+Strings.ies(sources); + } + + /** Apply post-processing to the entire list of results */ + public <V2b> MultiBuilder<T, V, V2b> postProcessFromMultiple(final Function<? super List<V>, V2b> val) { + this.postProcessFromMultiple = (Function) checkNotNull(val, "postProcessFromMulitple"); + return (MultiBuilder<T,V, V2b>) this; + } + /** Apply post-processing to the entire list of results + * See {@link CollectionFunctionals#all(Predicate)} and {@link CollectionFunctionals#quorum(org.apache.brooklyn.util.collections.QuorumCheck, Predicate) + * which allow useful arguments. */ + public MultiBuilder<T, V, Boolean> postProcessFromMultiple(final Predicate<? super List<V>> val) { + return postProcessFromMultiple(Functions.forPredicate(val)); + } + + public <V1> MultiBuilder<T, V1, V2> postProcess(Closure<V1> val) { + builder.postProcess(val); + return (MultiBuilder<T, V1, V2>) this; + } + public <V1> MultiBuilder<T, V1, V2> postProcess(final Function<? super T, V1> val) { + builder.postProcess(val); + return (MultiBuilder<T, V1, V2>) this; + } + public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor) { + builder.abortIf(source, sensor); + return this; + } + public <T2> MultiBuilder<T, V, V2> abortIf(Entity source, AttributeSensor<T2> sensor, Predicate<? super T2> predicate) { + builder.abortIf(source, sensor, predicate); + return this; + } + public MultiBuilder<T, V, V2> abortIfOnFire() { + builder.abortIfOnFire(); + return this; + } + public MultiBuilder<T, V, V2> blockingDetails(String val) { + builder.blockingDetails(val); + return this; + } + public MultiBuilder<T, V, V2> timeout(Duration val) { + builder.timeout(val); + return this; + } + public MultiBuilder<T, V, V2> onTimeoutReturn(V val) { + builder.onTimeoutReturn(val); + return this; + } + public MultiBuilder<T, V, V2> onTimeoutThrow() { + builder.onTimeoutThrow(); + return this; + } + public MultiBuilder<T, V, V2> onUnmanagedReturn(V val) { + builder.onUnmanagedReturn(val); + return this; + } + public MultiBuilder<T, V, V2> onUnmanagedThrow() { + builder.onUnmanagedThrow(); + return this; + } + + public Task<V2> build() { + List<Task<V>> tasks = MutableList.of(); + for (AttributeAndSensorCondition<?> source: multiSource) { + builder.source(source.source); + builder.sensor((AttributeSensor)source.sensor); + builder.readiness((Predicate)source.predicate); + tasks.add(builder.build()); + } + final Task<List<V>> parallelTask = Tasks.<List<V>>builder().parallel(true).addAll(tasks) + .displayName(name) + .description(descriptionBase+ + (builder.timeout!=null ? ", timeout "+builder.timeout : "")) + .build(); + + if (postProcessFromMultiple == null) { + // V2 should be the right type in normal operations + return (Task<V2>) parallelTask; + } else { + return Tasks.<V2>builder().displayName(name).description(descriptionBase) + .tag("attributeWhenReady") + .body(new Callable<V2>() { + @Override public V2 call() throws Exception { + List<V> prePostProgress = DynamicTasks.queue(parallelTask).get(); + return DynamicTasks.queue( + Tasks.<V2>builder().displayName("post-processing").description("Applying "+postProcessFromMultiple) + .body(Functionals.callable(postProcessFromMultiple, prePostProgress)) + .build()).get(); + } + }) + .build(); + } + } + } + + }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/AbstractTypePlanTransformer.java ---------------------------------------------------------------------- diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/AbstractTypePlanTransformer.java index 0000000,ee49d39..8f671f2 mode 000000,100644..100644 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/AbstractTypePlanTransformer.java +++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/AbstractTypePlanTransformer.java @@@ -1,0 -1,142 +1,137 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.brooklyn.core.typereg; + + import org.apache.brooklyn.api.internal.AbstractBrooklynObjectSpec; + import org.apache.brooklyn.api.mgmt.ManagementContext; + import org.apache.brooklyn.api.typereg.RegisteredType; + import org.apache.brooklyn.api.typereg.RegisteredTypeLoadingContext; + import org.apache.brooklyn.util.exceptions.Exceptions; ++import org.apache.brooklyn.util.guava.Maybe; + import org.apache.brooklyn.util.javalang.JavaClassNames; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * Convenience supertype for {@link BrooklynTypePlanTransformer} instances. + * <p> + * This supplies a default {@link #scoreForType(RegisteredType, RegisteredTypeLoadingContext)} + * method which returns 1 if the format code matches, + * and otherwise branches to two methods {@link #scoreForNullFormat(Object, RegisteredType, RegisteredTypeLoadingContext)} + * and {@link #scoreForNonmatchingNonnullFormat(String, Object, RegisteredType, RegisteredTypeLoadingContext)} + * which subclasses can implement. (Often the implementation of the latter is 0.) + */ + public abstract class AbstractTypePlanTransformer implements BrooklynTypePlanTransformer { + + private static final Logger log = LoggerFactory.getLogger(AbstractTypePlanTransformer.class); + + protected ManagementContext mgmt; + + @Override + public void setManagementContext(ManagementContext mgmt) { + this.mgmt = mgmt; + } + + private final String format; + private final String formatName; + private final String formatDescription; + + protected AbstractTypePlanTransformer(String format, String formatName, String formatDescription) { + this.format = format; + this.formatName = formatName; + this.formatDescription = formatDescription; + } + + @Override + public String getFormatCode() { + return format; + } + + @Override + public String getFormatName() { + return formatName; + } + + @Override + public String getFormatDescription() { + return formatDescription; + } + + @Override + public String toString() { + return getFormatCode()+":"+JavaClassNames.simpleClassName(this); + } + + @Override + public double scoreForType(RegisteredType type, RegisteredTypeLoadingContext context) { + if (getFormatCode().equals(type.getPlan().getPlanFormat())) return 1; + if (type.getPlan().getPlanFormat()==null) + return scoreForNullFormat(type.getPlan().getPlanData(), type, context); + else + return scoreForNonmatchingNonnullFormat(type.getPlan().getPlanFormat(), type.getPlan().getPlanData(), type, context); + } + + protected abstract double scoreForNullFormat(Object planData, RegisteredType type, RegisteredTypeLoadingContext context); + protected abstract double scoreForNonmatchingNonnullFormat(String planFormat, Object planData, RegisteredType type, RegisteredTypeLoadingContext context); + + /** delegates to more specific abstract create methods, + * and performs common validation and customisation of the items created. + * <p> + * this includes: + * <li> setting the {@link AbstractBrooklynObjectSpec#catalogItemId(String)} + */ + @Override + public Object create(final RegisteredType type, final RegisteredTypeLoadingContext context) { + try { - return validate(new RegisteredTypeKindVisitor<Object>() { ++ return tryValidate(new RegisteredTypeKindVisitor<Object>() { + @Override protected Object visitSpec() { + try { + AbstractBrooklynObjectSpec<?, ?> result = createSpec(type, context); + result.catalogItemId(type.getId()); + return result; + } catch (Exception e) { throw Exceptions.propagate(e); } + } + @Override protected Object visitBean() { + try { + return createBean(type, context); + } catch (Exception e) { throw Exceptions.propagate(e); } + } + - }.visit(type.getKind()), type, context); ++ }.visit(type.getKind()), type, context).get(); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + if (!(e instanceof UnsupportedTypePlanException)) { + log.debug("Could not instantiate "+type+" (rethrowing): "+Exceptions.collapseText(e)); + } + throw Exceptions.propagate(e); + } + } + + /** Validates the object. Subclasses may do further validation based on the context. + * @throw UnsupportedTypePlanException if we want to quietly abandon this, any other exception to report the problem, when validation fails + * @return the created object for fluent usage */ - protected <T> T validate(T createdObject, RegisteredType type, RegisteredTypeLoadingContext constraint) { - if (createdObject==null) return null; - try { - return RegisteredTypes.validate(createdObject, type, constraint); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - throw new IllegalStateException("Created incompatible object: "+Exceptions.collapseText(e), e); - } ++ protected <T> Maybe<T> tryValidate(T createdObject, RegisteredType type, RegisteredTypeLoadingContext constraint) { ++ return RegisteredTypes.tryValidate(createdObject, type, constraint); + } + + protected abstract AbstractBrooklynObjectSpec<?,?> createSpec(RegisteredType type, RegisteredTypeLoadingContext context) throws Exception; + + protected abstract Object createBean(RegisteredType type, RegisteredTypeLoadingContext context) throws Exception; + + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistry.java ---------------------------------------------------------------------- diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistry.java index 0000000,b36be34..5d4bbf6 mode 000000,100644..100644 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistry.java +++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistry.java @@@ -1,0 -1,210 +1,296 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.brooklyn.core.typereg; + ++import java.util.Map; + import java.util.Set; + + import javax.annotation.Nullable; + + import org.apache.brooklyn.api.catalog.BrooklynCatalog; + import org.apache.brooklyn.api.catalog.CatalogItem; + import org.apache.brooklyn.api.catalog.CatalogItem.CatalogItemType; + import org.apache.brooklyn.api.internal.AbstractBrooklynObjectSpec; + import org.apache.brooklyn.api.mgmt.ManagementContext; + import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry; + import org.apache.brooklyn.api.typereg.RegisteredType; + import org.apache.brooklyn.api.typereg.RegisteredType.TypeImplementationPlan; + import org.apache.brooklyn.api.typereg.RegisteredTypeLoadingContext; + import org.apache.brooklyn.core.catalog.internal.BasicBrooklynCatalog; + import org.apache.brooklyn.core.catalog.internal.CatalogItemBuilder; + import org.apache.brooklyn.core.catalog.internal.CatalogUtils; -import org.apache.brooklyn.util.collections.MutableList; ++import org.apache.brooklyn.test.Asserts; ++import org.apache.brooklyn.util.collections.MutableMap; + import org.apache.brooklyn.util.collections.MutableSet; + import org.apache.brooklyn.util.exceptions.Exceptions; + import org.apache.brooklyn.util.guava.Maybe; + import org.apache.brooklyn.util.text.Identifiers; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + -import com.google.api.client.util.Preconditions; ++import com.google.common.annotations.Beta; ++import com.google.common.base.Preconditions; + import com.google.common.base.Predicate; + import com.google.common.base.Predicates; + import com.google.common.collect.Iterables; + + public class BasicBrooklynTypeRegistry implements BrooklynTypeRegistry { + - @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(BasicBrooklynTypeRegistry.class); + + private ManagementContext mgmt; ++ private Map<String,RegisteredType> localRegisteredTypes = MutableMap.of(); + + public BasicBrooklynTypeRegistry(ManagementContext mgmt) { + this.mgmt = mgmt; + } + + public Iterable<RegisteredType> getAll() { - return getAll(Predicates.alwaysTrue()); ++ return getMatching(Predicates.alwaysTrue()); + } + ++ private Iterable<RegisteredType> getAllWithoutCatalog(Predicate<? super RegisteredType> filter) { ++ // TODO thread safety ++ // TODO optimisation? make indexes and look up? ++ return Iterables.filter(localRegisteredTypes.values(), filter); ++ } ++ ++ private Maybe<RegisteredType> getExactWithoutLegacyCatalog(String symbolicName, String version, RegisteredTypeLoadingContext constraint) { ++ // TODO look in any nested/private registries ++ RegisteredType item = localRegisteredTypes.get(symbolicName+":"+version); ++ return RegisteredTypes.tryValidate(item, constraint); ++ } ++ + @SuppressWarnings("deprecation") + @Override - public Iterable<RegisteredType> getAll(Predicate<? super RegisteredType> filter) { - return Iterables.filter(Iterables.transform(mgmt.getCatalog().getCatalogItems(), RegisteredTypes.CI_TO_RT), filter); ++ public Iterable<RegisteredType> getMatching(Predicate<? super RegisteredType> filter) { ++ return Iterables.filter(Iterables.concat( ++ getAllWithoutCatalog(filter), ++ Iterables.transform(mgmt.getCatalog().getCatalogItems(), RegisteredTypes.CI_TO_RT)), ++ filter); + } + + @SuppressWarnings("deprecation") - private RegisteredType get(String symbolicName, String version, RegisteredTypeLoadingContext constraint) { - // probably constraint is not useful? - if (constraint==null) constraint = RegisteredTypeLoadingContexts.any(); ++ private Maybe<RegisteredType> getSingle(String symbolicNameOrAliasIfNoVersion, final String versionFinal, final RegisteredTypeLoadingContext contextFinal) { ++ RegisteredTypeLoadingContext context = contextFinal; ++ if (context==null) context = RegisteredTypeLoadingContexts.any(); ++ String version = versionFinal; + if (version==null) version = BrooklynCatalog.DEFAULT_VERSION; ++ ++ if (!BrooklynCatalog.DEFAULT_VERSION.equals(version)) { ++ // normal code path when version is supplied ++ ++ Maybe<RegisteredType> type = getExactWithoutLegacyCatalog(symbolicNameOrAliasIfNoVersion, version, context); ++ if (type.isPresent()) return type; ++ } ++ ++ if (BrooklynCatalog.DEFAULT_VERSION.equals(version)) { ++ // alternate code path, if version blank or default ++ ++ Iterable<RegisteredType> types = getMatching(Predicates.and(RegisteredTypePredicates.symbolicName(symbolicNameOrAliasIfNoVersion), ++ RegisteredTypePredicates.satisfies(context))); ++ if (Iterables.isEmpty(types)) { ++ // look for alias if no exact symbolic name match AND no version is specified ++ types = getMatching(Predicates.and(RegisteredTypePredicates.alias(symbolicNameOrAliasIfNoVersion), ++ RegisteredTypePredicates.satisfies(context) ) ); ++ // if there are multiple symbolic names then throw? ++ Set<String> uniqueSymbolicNames = MutableSet.of(); ++ for (RegisteredType t: types) { ++ uniqueSymbolicNames.add(t.getSymbolicName()); ++ } ++ if (uniqueSymbolicNames.size()>1) { ++ String message = "Multiple matches found for alias '"+symbolicNameOrAliasIfNoVersion+"': "+uniqueSymbolicNames+"; " ++ + "refusing to select any."; ++ log.warn(message); ++ return Maybe.absent(message); ++ } ++ } ++ if (!Iterables.isEmpty(types)) { ++ RegisteredType type = RegisteredTypes.getBestVersion(types); ++ if (type!=null) return Maybe.of(type); ++ } ++ } + - // TODO lookup here, using constraints ++ // missing case is to look for exact version in legacy catalog ++ CatalogItem<?, ?> item = mgmt.getCatalog().getCatalogItem(symbolicNameOrAliasIfNoVersion, version); ++ if (item!=null) ++ return Maybe.of( RegisteredTypes.CI_TO_RT.apply( item ) ); + - // fallback to catalog - CatalogItem<?, ?> item = mgmt.getCatalog().getCatalogItem(symbolicName, version); - // TODO apply constraint - return RegisteredTypes.CI_TO_RT.apply( item ); ++ return Maybe.absent("No matches for "+symbolicNameOrAliasIfNoVersion+ ++ (versionFinal!=null ? ":"+versionFinal : "")+ ++ (contextFinal!=null ? " ("+contextFinal+")" : "") ); + } + + @Override + public RegisteredType get(String symbolicName, String version) { - return get(symbolicName, version, null); ++ return getSingle(symbolicName, version, null).orNull(); + } + - private RegisteredType get(String symbolicNameWithOptionalVersion, RegisteredTypeLoadingContext constraint) { - // probably constraint is not useful? ++ @Override ++ public RegisteredType get(String symbolicNameWithOptionalVersion, RegisteredTypeLoadingContext context) { ++ return getMaybe(symbolicNameWithOptionalVersion, context).orNull(); ++ } ++ @Override ++ public Maybe<RegisteredType> getMaybe(String symbolicNameWithOptionalVersion, RegisteredTypeLoadingContext context) { ++ Maybe<RegisteredType> r1 = null; + if (CatalogUtils.looksLikeVersionedId(symbolicNameWithOptionalVersion)) { + String symbolicName = CatalogUtils.getSymbolicNameFromVersionedId(symbolicNameWithOptionalVersion); + String version = CatalogUtils.getVersionFromVersionedId(symbolicNameWithOptionalVersion); - return get(symbolicName, version, constraint); - } else { - return get(symbolicNameWithOptionalVersion, BrooklynCatalog.DEFAULT_VERSION, constraint); ++ r1 = getSingle(symbolicName, version, context); ++ if (r1.isPresent()) return r1; + } ++ ++ Maybe<RegisteredType> r2 = getSingle(symbolicNameWithOptionalVersion, BrooklynCatalog.DEFAULT_VERSION, context); ++ if (r2.isPresent() || r1==null) return r2; ++ return r1; + } + + @Override + public RegisteredType get(String symbolicNameWithOptionalVersion) { + return get(symbolicNameWithOptionalVersion, (RegisteredTypeLoadingContext)null); + } + + @Override + public <SpecT extends AbstractBrooklynObjectSpec<?,?>> SpecT createSpec(RegisteredType type, @Nullable RegisteredTypeLoadingContext constraint, Class<SpecT> specSuperType) { + Preconditions.checkNotNull(type, "type"); + if (type.getKind()!=RegisteredTypeKind.SPEC) { + throw new IllegalStateException("Cannot create spec from type "+type+" (kind "+type.getKind()+")"); + } + return createSpec(type, type.getPlan(), type.getSymbolicName(), type.getVersion(), type.getSuperTypes(), constraint, specSuperType); + } + + @SuppressWarnings({ "deprecation", "unchecked", "rawtypes" }) + private <SpecT extends AbstractBrooklynObjectSpec<?,?>> SpecT createSpec( + RegisteredType type, + TypeImplementationPlan plan, + @Nullable String symbolicName, @Nullable String version, Set<Object> superTypes, + @Nullable RegisteredTypeLoadingContext constraint, Class<SpecT> specSuperType) { + // TODO type is only used to call to "transform"; we should perhaps change transform so it doesn't need the type? + if (constraint!=null) { + if (constraint.getExpectedKind()!=null && constraint.getExpectedKind()!=RegisteredTypeKind.SPEC) { + throw new IllegalStateException("Cannot create spec with constraint "+constraint); + } + if (constraint.getAlreadyEncounteredTypes().contains(symbolicName)) { + // avoid recursive cycle + // TODO implement using java if permitted + } + } + constraint = RegisteredTypeLoadingContexts.withSpecSuperType(constraint, specSuperType); + + Maybe<Object> result = TypePlanTransformers.transform(mgmt, type, constraint); + if (result.isPresent()) return (SpecT) result.get(); + + // fallback: look up in (legacy) catalog + // TODO remove once all transformers are available in the new style + CatalogItem item = symbolicName!=null ? (CatalogItem) mgmt.getCatalog().getCatalogItem(symbolicName, version) : null; + if (item==null) { + // if not in catalog (because loading a new item?) then look up item based on type + // (only really used in tests; possibly also for any recursive legacy transformers we might have to create a CI; cross that bridge when we come to it) + CatalogItemType ciType = CatalogItemType.ofTargetClass( (Class)constraint.getExpectedJavaSuperType() ); + if (ciType==null) { + // throw -- not supported for non-spec types + result.get(); + } + item = CatalogItemBuilder.newItem(ciType, + symbolicName!=null ? symbolicName : Identifiers.makeRandomId(8), + version!=null ? version : BasicBrooklynCatalog.DEFAULT_VERSION) + .plan((String)plan.getPlanData()) + .build(); + } + try { + return (SpecT) BasicBrooklynCatalog.internalCreateSpecLegacy(mgmt, item, constraint.getAlreadyEncounteredTypes(), false); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + // for now, combine this failure with the original + try { + result.get(); + // above will throw -- so won't come here + throw new IllegalStateException("should have failed getting type resolution for "+symbolicName); + } catch (Exception e0) { + Set<Exception> exceptionsInOrder = MutableSet.of(); + if (e0.toString().indexOf("none of the available transformers")>=0) { + // put the legacy exception first if none of the new transformers support the type + // (until the new transformer is the primary pathway) + exceptionsInOrder.add(e); + } + exceptionsInOrder.add(e0); + exceptionsInOrder.add(e); + throw Exceptions.create("Unable to instantiate "+(symbolicName==null ? "item" : symbolicName), exceptionsInOrder); + } + } + } + + @Override + public <SpecT extends AbstractBrooklynObjectSpec<?, ?>> SpecT createSpecFromPlan(String planFormat, Object planData, RegisteredTypeLoadingContext optionalConstraint, Class<SpecT> optionalSpecSuperType) { + return createSpec(RegisteredTypes.spec(null, null, new BasicTypeImplementationPlan(planFormat, planData), null), + optionalConstraint, optionalSpecSuperType); + } + + @Override + public <T> T createBean(RegisteredType type, RegisteredTypeLoadingContext constraint, Class<T> optionalResultSuperType) { + Preconditions.checkNotNull(type, "type"); + if (type.getKind()!=RegisteredTypeKind.SPEC) { + throw new IllegalStateException("Cannot create spec from type "+type+" (kind "+type.getKind()+")"); + } + if (constraint!=null) { + if (constraint.getExpectedKind()!=null && constraint.getExpectedKind()!=RegisteredTypeKind.SPEC) { + throw new IllegalStateException("Cannot create spec with constraint "+constraint); + } + if (constraint.getAlreadyEncounteredTypes().contains(type.getSymbolicName())) { + // avoid recursive cycle - // TODO implement using java if permitted ++ // TODO create type using java if permitted? ++ // OR remove this creator from those permitted + } + } + constraint = RegisteredTypeLoadingContexts.withBeanSuperType(constraint, optionalResultSuperType); + + @SuppressWarnings("unchecked") + T result = (T) TypePlanTransformers.transform(mgmt, type, constraint).get(); + return result; + } + + @Override + public <T> T createBeanFromPlan(String planFormat, Object planData, RegisteredTypeLoadingContext optionalConstraint, Class<T> optionalBeanSuperType) { + return createBean(RegisteredTypes.bean(null, null, new BasicTypeImplementationPlan(planFormat, planData), null), + optionalConstraint, optionalBeanSuperType); + } + ++ @Beta // API is stabilising ++ public void addToLocalUnpersistedTypeRegistry(RegisteredType type, boolean canForce) { ++ Preconditions.checkNotNull(type); ++ Preconditions.checkNotNull(type.getSymbolicName()); ++ Preconditions.checkNotNull(type.getVersion()); ++ Preconditions.checkNotNull(type.getId()); ++ if (!type.getId().equals(type.getSymbolicName()+":"+type.getVersion())) ++ Asserts.fail("Registered type "+type+" has ID / symname mismatch"); ++ ++ RegisteredType oldType = mgmt.getTypeRegistry().get(type.getId()); ++ if (oldType==null || canForce) { ++ log.debug("Inserting "+type+" into "+this); ++ localRegisteredTypes.put(type.getId(), type); ++ } else { ++ if (oldType == type) { ++ // ignore if same instance ++ // (equals not yet implemented, so would be the same, but misleading) ++ return; ++ } ++ throw new IllegalStateException("Cannot add "+type+" to catalog; different "+oldType+" is already present"); ++ } ++ } + } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/BasicRegisteredType.java ---------------------------------------------------------------------- diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/BasicRegisteredType.java index 0000000,3905d65..05f0773 mode 000000,100644..100644 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/BasicRegisteredType.java +++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/BasicRegisteredType.java @@@ -1,0 -1,135 +1,149 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.brooklyn.core.typereg; + + import java.util.Collection; + import java.util.List; + import java.util.Set; + + import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry.RegisteredTypeKind; + import org.apache.brooklyn.api.typereg.OsgiBundleWithUrl; + import org.apache.brooklyn.api.typereg.RegisteredType; + import org.apache.brooklyn.util.collections.MutableList; + import org.apache.brooklyn.util.collections.MutableSet; + import org.apache.brooklyn.util.core.config.ConfigBag; + import org.apache.brooklyn.util.javalang.JavaClassNames; + + import com.google.common.annotations.Beta; + import com.google.common.collect.ImmutableSet; + + /** Instances are usually created by methods in {@link RegisteredTypes}. */ + public class BasicRegisteredType implements RegisteredType { + ++ final RegisteredTypeKind kind; + final String symbolicName; + final String version; - final RegisteredTypeKind kind; + - Set<Object> superTypes = MutableSet.of(); - List<OsgiBundleWithUrl> bundles = MutableList.of(); ++ final List<OsgiBundleWithUrl> bundles = MutableList.of(); + String displayName; + String description; + String iconUrl; ++ ++ final Set<Object> superTypes = MutableSet.of(); + boolean deprecated; + boolean disabled; ++ final Set<String> aliases = MutableSet.of(); ++ final Set<Object> tags = MutableSet.of(); + + TypeImplementationPlan implementationPlan; + + private transient ConfigBag cache = new ConfigBag(); + + BasicRegisteredType(RegisteredTypeKind kind, String symbolicName, String version, TypeImplementationPlan implementationPlan) { + this.kind = kind; + this.symbolicName = symbolicName; + this.version = version; + this.implementationPlan = implementationPlan; + } + + @Override + public String getId() { + return symbolicName + (version!=null ? ":"+version : ""); + } ++ ++ @Override ++ public RegisteredTypeKind getKind() { ++ return kind; ++ } + + @Override + public String getSymbolicName() { + return symbolicName; + } + + @Override + public String getVersion() { + return version; + } - - @Override - public RegisteredTypeKind getKind() { - return kind; - } + + @Override + public Collection<OsgiBundleWithUrl> getLibraries() { + return ImmutableSet.copyOf(bundles); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String getIconUrl() { + return iconUrl; + } + + @Override ++ public Set<Object> getSuperTypes() { ++ return ImmutableSet.copyOf(superTypes); ++ } ++ ++ @Override + public boolean isDisabled() { + return disabled; + } + + @Override + public boolean isDeprecated() { + return deprecated; + } + + @Override - public Set<Object> getSuperTypes() { - return ImmutableSet.copyOf(superTypes); ++ public Set<String> getAliases() { ++ return ImmutableSet.copyOf(aliases); ++ } ++ ++ @Override ++ public Set<Object> getTags() { ++ return ImmutableSet.copyOf(tags); + } + ++ + @Beta // TODO depending how useful this is, it might be better to replace by a static WeakHashMap in RegisteredTypes + public ConfigBag getCache() { + return cache; + } + + @Override + public TypeImplementationPlan getPlan() { + return implementationPlan; + } + + @Override + public String toString() { + return JavaClassNames.simpleClassName(this)+"["+getId()+ + (isDisabled() ? ";DISABLED" : "")+ + (isDeprecated() ? ";deprecated" : "")+ + (getPlan()!=null ? ";"+getPlan().getPlanFormat() : "")+ + "]"; + } + }
