http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java deleted file mode 100644 index 57d4712..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.mgmt.SubscriptionContext; -import org.apache.brooklyn.api.mgmt.SubscriptionHandle; -import org.apache.brooklyn.api.mgmt.SubscriptionManager; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; - -import groovy.lang.Closure; - -/** - * A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}. - */ -public class BasicSubscriptionContext implements SubscriptionContext { - - private static final Logger LOG = LoggerFactory.getLogger(BasicSubscriptionContext.class); - - private final SubscriptionManager manager; - private final Object subscriber; - private final Map<String,Object> flags; - - public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) { - this(Collections.<String,Object>emptyMap(), manager, subscriber); - } - - public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) { - this.manager = manager; - this.subscriber = subscriber; - this.flags = mapOf("subscriber", subscriber); - if (flags!=null) this.flags.putAll(flags); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, Closure c) { - return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, c); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, Closure c) { - return subscribe(newFlags, producer, sensor, toSensorEventListener(c)); - } - - @Override - public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, listener); - } - - @Override - public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { - Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags); - if (newFlags != null) subscriptionFlags.putAll(newFlags); - return manager.subscribe(subscriptionFlags, producer, sensor, listener); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, Closure c) { - return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, c); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, Closure c) { - return subscribeToChildren(newFlags, parent, sensor, toSensorEventListener(c)); - } - - @Override - public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, listener); - } - - @Override - public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags); - if (newFlags != null) subscriptionFlags.putAll(newFlags); - return manager.subscribeToChildren(subscriptionFlags, parent, sensor, listener); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, Closure c) { - return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, c); - } - - @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, Closure c) { - return subscribeToMembers(newFlags, parent, sensor, toSensorEventListener(c)); - } - - @Override - public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, listener); - } - - @Override - public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { - Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags); - if (newFlags != null) subscriptionFlags.putAll(newFlags); - return manager.subscribeToMembers(subscriptionFlags, parent, sensor, listener); - } - - @SuppressWarnings("rawtypes") - @Override - public boolean unsubscribe(SubscriptionHandle subscriptionId) { - Preconditions.checkNotNull(subscriptionId, "subscriptionId must not be null"); - Preconditions.checkArgument(Objects.equal(subscriber, ((Subscription) subscriptionId).subscriber), "The subscriptionId is for a different "+subscriber+"; expected "+((Subscription) subscriptionId).subscriber); - return manager.unsubscribe(subscriptionId); - } - - /** @see SubscriptionManager#publish(SensorEvent) */ - @Override - public <T> void publish(SensorEvent<T> event) { - manager.publish(event); - } - - /** Return the subscriptions associated with this context */ - @Override - public Set<SubscriptionHandle> getSubscriptions() { - return manager.getSubscriptionsForSubscriber(subscriber); - } - - @Override - public int unsubscribeAll() { - int count = 0; - - // To avoid ConcurrentModificationException when copying subscriptions, need to synchronize on it - Set<SubscriptionHandle> subscriptions = getSubscriptions(); - Collection<SubscriptionHandle> subscriptionsCopy; - synchronized (subscriptions) { - subscriptionsCopy = ImmutableList.copyOf(subscriptions); - } - - for (SubscriptionHandle s : subscriptionsCopy) { - count++; - boolean result = unsubscribe(s); - if (!result) LOG.warn("When unsubscribing from all of {}, unsubscribe of {} return false", subscriber, s); - } - return count; - } - - @SuppressWarnings("rawtypes") - private <T> SensorEventListener<T> toSensorEventListener(final Closure c) { - return new SensorEventListener<T>() { - @Override public void onEvent(SensorEvent<T> event) { - c.call(event); - } - }; - } -}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java deleted file mode 100644 index c8ef0e6..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java +++ /dev/null @@ -1,625 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.ConcurrentModificationException; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.mgmt.HasTaskChildren; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.internal.BrooklynProperties; -import org.apache.brooklyn.core.internal.storage.BrooklynStorage; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedStream; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.collections.MutableSet; -import org.apache.brooklyn.util.core.task.BasicExecutionManager; -import org.apache.brooklyn.util.core.task.ExecutionListener; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.javalang.MemoryUsageTracker; -import org.apache.brooklyn.util.text.Strings; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.annotations.Beta; -import com.google.common.collect.Iterables; - -/** - * Deletes record of old tasks, to prevent space leaks and the eating up of more and more memory. - * - * The deletion policy is configurable: - * <ul> - * <li>Period - how frequently to look at the existing tasks to delete some, if required - * <li>Max task age - the time after which a completed task will be automatically deleted - * (i.e. any root task completed more than maxTaskAge ago will be deleted) - * <li>Max tasks per <various categories> - the maximum number of tasks to be kept for a given tag, - * split into categories based on what is seeming to be useful - * </ul> - * - * The default is to check with a period of one minute, deleting tasks after 30 days, - * and keeping at most 100000 tasks in the system, - * max 1000 tasks per entity, 50 per effector within that entity, and 50 per other non-effector tag - * within that entity (or global if not attached to an entity). - * - * @author aled - */ -public class BrooklynGarbageCollector { - - private static final Logger LOG = LoggerFactory.getLogger(BrooklynGarbageCollector.class); - - public static final ConfigKey<Duration> GC_PERIOD = ConfigKeys.newDurationConfigKey( - "brooklyn.gc.period", "the period for checking if any tasks need to be deleted", - Duration.minutes(1)); - - public static final ConfigKey<Boolean> DO_SYSTEM_GC = ConfigKeys.newBooleanConfigKey( - "brooklyn.gc.doSystemGc", "whether to periodically call System.gc()", false); - - /** - * should we check for tasks which are submitted by another but backgrounded, i.e. not a child of that task? - * default to yes, despite it can be some extra loops, to make sure we GC them promptly. - * @since 0.7.0 */ - // work offender is {@link DynamicSequentialTask} internal job tracker, but it is marked - // transient so it is destroyed prompty; there may be others, however; - // but OTOH it might be expensive to check for these all the time! - // TODO probably we can set this false (remove this and related code), - // and just rely on usual GC to pick up background tasks; the lifecycle of background task - // should normally be independent of the submitter. (DST was the exception, and marking - // transient there fixes the main problem, which is when the submitter is GC'd but the submitted is not, - // and we don't want the submitted to show up at the root in the GUI, which it will if its - // submitter has been GC'd) - @Beta - public static final ConfigKey<Boolean> CHECK_SUBTASK_SUBMITTERS = ConfigKeys.newBooleanConfigKey( - "brooklyn.gc.checkSubtaskSubmitters", "whether for subtasks to check the submitters", true); - - public static final ConfigKey<Integer> MAX_TASKS_PER_TAG = ConfigKeys.newIntegerConfigKey( - "brooklyn.gc.maxTasksPerTag", - "the maximum number of tasks to be kept for a given tag " - + "within an execution context (e.g. entity); " - + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full", - 50); - - public static final ConfigKey<Integer> MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey( - "brooklyn.gc.maxTasksPerEntity", - "the maximum number of tasks to be kept for a given entity", - 1000); - - public static final ConfigKey<Integer> MAX_TASKS_GLOBAL = ConfigKeys.newIntegerConfigKey( - "brooklyn.gc.maxTasksGlobal", - "the maximum number of tasks to be kept across the entire system", - 100000); - - public static final ConfigKey<Duration> MAX_TASK_AGE = ConfigKeys.newDurationConfigKey( - "brooklyn.gc.maxTaskAge", - "the duration after which a completed task will be automatically deleted", - Duration.days(30)); - - protected final static Comparator<Task<?>> TASKS_OLDEST_FIRST_COMPARATOR = new Comparator<Task<?>>() { - @Override public int compare(Task<?> t1, Task<?> t2) { - long end1 = t1.getEndTimeUtc(); - long end2 = t2.getEndTimeUtc(); - return (end1 < end2) ? -1 : ((end1 == end2) ? 0 : 1); - } - }; - - private final BasicExecutionManager executionManager; - private final BrooklynStorage storage; - private final BrooklynProperties brooklynProperties; - private final ScheduledExecutorService executor; - private ScheduledFuture<?> activeCollector; - private Map<Entity,Task<?>> unmanagedEntitiesNeedingGc = new LinkedHashMap<Entity, Task<?>>(); - - private Duration gcPeriod; - private final boolean doSystemGc; - private volatile boolean running = true; - - public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) { - this.executionManager = executionManager; - this.storage = storage; - this.brooklynProperties = brooklynProperties; - - doSystemGc = brooklynProperties.getConfig(DO_SYSTEM_GC); - - executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override public Thread newThread(Runnable r) { - return new Thread(r, "brooklyn-gc"); - }}); - - executionManager.addListener(new ExecutionListener() { - @Override public void onTaskDone(Task<?> task) { - BrooklynGarbageCollector.this.onTaskDone(task); - }}); - - scheduleCollector(true); - } - - protected synchronized void scheduleCollector(boolean canInterruptCurrent) { - if (activeCollector != null) activeCollector.cancel(canInterruptCurrent); - - gcPeriod = brooklynProperties.getConfig(GC_PERIOD); - if (gcPeriod!=null) { - activeCollector = executor.scheduleWithFixedDelay( - new Runnable() { - @Override public void run() { - gcIteration(); - } - }, - gcPeriod.toMillisecondsRoundingUp(), - gcPeriod.toMillisecondsRoundingUp(), - TimeUnit.MILLISECONDS); - } - } - - /** force a round of Brooklyn garbage collection */ - public void gcIteration() { - try { - logUsage("brooklyn gc (before)"); - gcTasks(); - logUsage("brooklyn gc (after)"); - - if (doSystemGc) { - // Can be very useful when tracking down OOMEs etc, where a lot of tasks are executing - // Empirically observed that (on OS X jvm at least) calling twice blocks - logs a significant - // amount of memory having been released, as though a full-gc had been run. But this is highly - // dependent on the JVM implementation. - System.gc(); System.gc(); - logUsage("brooklyn gc (after system gc)"); - } - } catch (Throwable t) { - Exceptions.propagateIfFatal(t); - LOG.warn("Error during management-context GC: "+t, t); - // previously we bailed on all errors, but I don't think we should do that -Alex - } - } - - public void logUsage(String prefix) { - if (LOG.isDebugEnabled()) - LOG.debug(prefix+" - using "+getUsageString()); - } - - public static String makeBasicUsageString() { - return Strings.makeSizeString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())+" / "+ - Strings.makeSizeString(Runtime.getRuntime().totalMemory()) + " memory" + - " ("+Strings.makeSizeString(MemoryUsageTracker.SOFT_REFERENCES.getBytesUsed()) + " soft); "+ - Thread.activeCount()+" threads"; - } - - public String getUsageString() { - return makeBasicUsageString()+"; "+ - "storage: " + storage.getStorageMetrics() + "; " + - "tasks: " + - executionManager.getNumActiveTasks()+" active, "+ - executionManager.getNumIncompleteTasks()+" unfinished; "+ - executionManager.getNumInMemoryTasks()+" remembered, "+ - executionManager.getTotalTasksSubmitted()+" total submitted)"; - } - - public void shutdownNow() { - running = false; - if (activeCollector != null) activeCollector.cancel(true); - if (executor != null) executor.shutdownNow(); - } - - public void onUnmanaged(Entity entity) { - // defer task deletions until the entity is completely unmanaged - // (this is usually invoked during the stop sequence) - synchronized (unmanagedEntitiesNeedingGc) { - unmanagedEntitiesNeedingGc.put(entity, Tasks.current()); - } - } - - public void deleteTasksForEntity(Entity entity) { - // remove all references to this entity from tasks - executionManager.deleteTag(entity); - executionManager.deleteTag(BrooklynTaskTags.tagForContextEntity(entity)); - executionManager.deleteTag(BrooklynTaskTags.tagForCallerEntity(entity)); - executionManager.deleteTag(BrooklynTaskTags.tagForTargetEntity(entity)); - } - - public void onUnmanaged(Location loc) { - // No-op currently; no tasks are tracked through their location - } - - public void onTaskDone(Task<?> task) { - if (shouldDeleteTaskImmediately(task)) { - executionManager.deleteTask(task); - } - } - - /** @deprecated since 0.7.0, method moved internal until semantics are clarified; see also {@link #shouldDeleteTaskImmediately(Task)} */ - @Deprecated - public boolean shouldDeleteTask(Task<?> task) { - return shouldDeleteTaskImmediately(task); - } - /** whether this task should be deleted on completion, - * because it is transient, or because it is submitted background without much context information */ - protected boolean shouldDeleteTaskImmediately(Task<?> task) { - if (!task.isDone()) return false; - - Set<Object> tags = task.getTags(); - if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG)) - return true; - if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) - return false; - - if (task.getSubmittedByTask()!=null) { - Task<?> parent = task.getSubmittedByTask(); - if (executionManager.getTask(parent.getId())==null) { - // parent is already cleaned up - return true; - } - if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) { - // it is a child, let the parent manage this task's death - return false; - } - Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task); - if (associatedEntity!=null) { - // this is associated to an entity; destroy only if the entity is unmanaged - return !Entities.isManaged(associatedEntity); - } - // if not associated to an entity, then delete immediately - return true; - } - - // e.g. scheduled tasks, sensor events, etc - // TODO (in future may keep some of these with another limit, based on a new TagCategory) - // there may also be a server association for server-side tasks which should be kept - // (but be careful not to keep too many subscriptions!) - - return true; - } - - /** - * Deletes old tasks. The age/number of tasks to keep is controlled by fields like - * {@link #maxTasksPerTag} and {@link #maxTaskAge}. - */ - protected synchronized int gcTasks() { - // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks. - // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help. - // - // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That - // is a far more memory efficient data structure (e.g. 4 bytes overhead per object rather than - // 32 bytes overhead per object for HashSet). - // - // More notes on optimization is in the history of this file. - - if (!running) return 0; - - Duration newPeriod = brooklynProperties.getConfig(GC_PERIOD); - if (!Objects.equal(gcPeriod, newPeriod)) { - // caller has changed period, reschedule on next run - scheduleCollector(false); - } - - expireUnmanagedEntityTasks(); - expireAgedTasks(); - expireTransientTasks(); - - // now look at overcapacity tags, non-entity tags first - - Set<Object> taskTags = executionManager.getTaskTags(); - - int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY); - int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG); - - Map<Object,AtomicInteger> taskNonEntityTagsOverCapacity = MutableMap.of(); - Map<Object,AtomicInteger> taskEntityTagsOverCapacity = MutableMap.of(); - - Map<Object,AtomicInteger> taskAllTagsOverCapacity = MutableMap.of(); - - for (Object tag : taskTags) { - if (isTagIgnoredForGc(tag)) continue; - - Set<Task<?>> tasksWithTag = executionManager.tasksWithTagLiveOrNull(tag); - if (tasksWithTag==null) continue; - AtomicInteger overA = null; - if (tag instanceof WrappedEntity) { - int over = tasksWithTag.size() - maxTasksPerEntity; - if (over>0) { - overA = new AtomicInteger(over); - taskEntityTagsOverCapacity.put(tag, overA); - } - } else { - int over = tasksWithTag.size() - maxTasksPerTag; - if (over>0) { - overA = new AtomicInteger(over); - taskNonEntityTagsOverCapacity.put(tag, overA); - } - } - if (overA!=null) { - taskAllTagsOverCapacity.put(tag, overA); - } - } - - int deletedCount = 0; - deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false); - deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true); - deletedCount += expireSubTasksWhoseSubmitterIsExpired(); - - int deletedGlobally = expireIfOverCapacityGlobally(); - deletedCount += deletedGlobally; - if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired(); - - return deletedCount; - } - - protected static boolean isTagIgnoredForGc(Object tag) { - if (tag == null) return true; - if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true; - if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true; - if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true; - if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true; - if (tag instanceof WrappedStream) { - return true; - } - - return false; - } - - protected void expireUnmanagedEntityTasks() { - Iterator<Entry<Entity, Task<?>>> ei; - synchronized (unmanagedEntitiesNeedingGc) { - ei = MutableSet.copyOf(unmanagedEntitiesNeedingGc.entrySet()).iterator(); - } - while (ei.hasNext()) { - Entry<Entity, Task<?>> ee = ei.next(); - if (Entities.isManaged(ee.getKey())) continue; - if (ee.getValue()!=null && !ee.getValue().isDone()) continue; - deleteTasksForEntity(ee.getKey()); - synchronized (unmanagedEntitiesNeedingGc) { - unmanagedEntitiesNeedingGc.remove(ee.getKey()); - } - } - } - - protected void expireAgedTasks() { - Duration maxTaskAge = brooklynProperties.getConfig(MAX_TASK_AGE); - - Collection<Task<?>> allTasks = executionManager.allTasksLive(); - Collection<Task<?>> tasksToDelete = MutableList.of(); - - try { - for (Task<?> task: allTasks) { - if (!task.isDone()) continue; - if (BrooklynTaskTags.isSubTask(task)) continue; - - if (maxTaskAge.isShorterThan(Duration.sinceUtc(task.getEndTimeUtc()))) - tasksToDelete.add(task); - } - - } catch (ConcurrentModificationException e) { - // delete what we've found so far - LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); - } - - for (Task<?> task: tasksToDelete) { - executionManager.deleteTask(task); - } - } - - protected void expireTransientTasks() { - Set<Task<?>> transientTasks = executionManager.getTasksWithTag(BrooklynTaskTags.TRANSIENT_TASK_TAG); - for (Task<?> t: transientTasks) { - if (!t.isDone()) continue; - executionManager.deleteTask(t); - } - } - - protected int expireSubTasksWhoseSubmitterIsExpired() { - // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS - if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS)) - return 0; - - Collection<Task<?>> allTasks = executionManager.allTasksLive(); - Collection<Task<?>> tasksToDelete = MutableList.of(); - try { - for (Task<?> task: allTasks) { - if (!task.isDone()) continue; - Task<?> submitter = task.getSubmittedByTask(); - // if we've leaked, ie a subtask which is not a child task, - // and the submitter is GC'd, then delete this also - if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) { - tasksToDelete.add(task); - } - } - - } catch (ConcurrentModificationException e) { - // delete what we've found so far - LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e); - } - - for (Task<?> task: tasksToDelete) { - executionManager.deleteTask(task); - } - return tasksToDelete.size(); - } - - protected enum TagCategory { - ENTITY, NON_ENTITY_NORMAL; - - public boolean acceptsTag(Object tag) { - if (isTagIgnoredForGc(tag)) return false; - if (tag instanceof WrappedEntity) return this==ENTITY; - if (this==ENTITY) return false; - return true; - } - } - - - /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */ - protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) { - if (emptyFilterNeeded) { - // previous run may have decremented counts - MutableList<Object> nowOkayTags = MutableList.of(); - for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) { - if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey()); - } - for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag); - } - - if (taskTagsInCategoryOverCapacity.isEmpty()) - return 0; - - Collection<Task<?>> tasks = executionManager.allTasksLive(); - List<Task<?>> tasksToConsiderDeleting = MutableList.of(); - try { - for (Task<?> task: tasks) { - if (!task.isDone()) continue; - - Set<Object> tags = task.getTags(); - - int categoryTags = 0, tooFullCategoryTags = 0; - for (Object tag: tags) { - if (category.acceptsTag(tag)) { - categoryTags++; - if (taskTagsInCategoryOverCapacity.containsKey(tag)) - tooFullCategoryTags++; - } - } - if (tooFullCategoryTags>0) { - if (categoryTags==tooFullCategoryTags) { - // all buckets are full, delete this one - tasksToConsiderDeleting.add(task); - } else { - // if any bucket is under capacity, then give grace to the other buckets in this category - for (Object tag: tags) { - if (category.acceptsTag(tag)) { - AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag); - if (over!=null) { - if (over.decrementAndGet()<=0) { - // and remove it from over-capacity if so - taskTagsInCategoryOverCapacity.remove(tag); - if (taskTagsInCategoryOverCapacity.isEmpty()) - return 0; - } - } - } - } - } - } - } - - } catch (ConcurrentModificationException e) { - // do CME's happen with these data structures? - // if so, let's just delete what we've found so far - LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e); - } - - if (LOG.isDebugEnabled()) - LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" " - + "tags over capacity, expiring old tasks; " - + tasksToConsiderDeleting.size()+" tasks under consideration; categories are: " - + taskTagsInCategoryOverCapacity); - - Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR); - // now try deleting tasks which are overcapacity for each (non-entity) tag - int deleted = 0; - for (Task<?> task: tasksToConsiderDeleting) { - boolean delete = true; - for (Object tag: task.getTags()) { - if (!category.acceptsTag(tag)) - continue; - if (taskTagsInCategoryOverCapacity.get(tag)==null) { - // no longer over capacity in this tag - delete = false; - break; - } - } - if (delete) { - // delete this and update overcapacity info - deleted++; - executionManager.deleteTask(task); - for (Object tag: task.getTags()) { - AtomicInteger counter = taskAllTagsOverCapacity.get(tag); - if (counter!=null && counter.decrementAndGet()<=0) - taskTagsInCategoryOverCapacity.remove(tag); - } - if (LOG.isTraceEnabled()) - LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity); - if (taskTagsInCategoryOverCapacity.isEmpty()) - break; - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; " - + "capacities now: " + taskTagsInCategoryOverCapacity); - return deleted; - } - - protected int expireIfOverCapacityGlobally() { - Collection<Task<?>> tasksLive = executionManager.allTasksLive(); - if (tasksLive.size() <= brooklynProperties.getConfig(MAX_TASKS_GLOBAL)) - return 0; - LOG.debug("brooklyn-gc detected "+tasksLive.size()+" tasks in memory, over global limit, looking at deleting some"); - - try { - tasksLive = MutableList.copyOf(tasksLive); - } catch (ConcurrentModificationException e) { - tasksLive = executionManager.getTasksWithAllTags(MutableList.of()); - } - - MutableList<Task<?>> tasks = MutableList.of(); - for (Task<?> task: tasksLive) { - if (task.isDone()) { - tasks.add(task); - } - } - - int numToDelete = tasks.size() - brooklynProperties.getConfig(MAX_TASKS_GLOBAL); - if (numToDelete <= 0) { - LOG.debug("brooklyn-gc detected only "+tasks.size()+" completed tasks in memory, not over global limit, so not deleting any"); - return 0; - } - - Collections.sort(tasks, TASKS_OLDEST_FIRST_COMPARATOR); - - int numDeleted = 0; - while (numDeleted < numToDelete && tasks.size()>numDeleted) { - executionManager.deleteTask( tasks.get(numDeleted++) ); - } - if (LOG.isDebugEnabled()) - LOG.debug("brooklyn-gc deleted "+numDeleted+" tasks as was over global limit, now have "+executionManager.allTasksLive().size()); - return numDeleted; - } - -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagementMode.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagementMode.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagementMode.java deleted file mode 100644 index 4c56515..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagementMode.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -/** Indicates how an entity/location/adjunct is treated at a given {@link ManagementContext} */ -public enum BrooklynObjectManagementMode { - /** item does not exist, not in memory, nor persisted (e.g. creating for first time, or finally destroying) */ - NONEXISTENT, - /** item exists or existed elsewhere, i.e. there is persisted state, but is not loaded here */ - UNMANAGED_PERSISTED, - /** item is loaded but read-only (ie not actively managed here) */ - LOADED_READ_ONLY, - /** item is actively managed here */ - MANAGED_PRIMARY -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagerInternal.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagerInternal.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagerInternal.java deleted file mode 100644 index db93b60..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagerInternal.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import org.apache.brooklyn.api.objs.BrooklynObject; - -public interface BrooklynObjectManagerInternal<T extends BrooklynObject> { - - ManagementTransitionMode getLastManagementTransitionMode(String itemId); - void setManagementTransitionMode(T item, ManagementTransitionMode mode); - - /** - * Begins management for the given rebinded root, recursively; - * if rebinding as a read-only copy, {@link #setReadOnly(T, boolean)} should be called prior to this. - */ - void manageRebindedRoot(T item); - - void unmanage(final T e, final ManagementTransitionMode info); - -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynShutdownHooks.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynShutdownHooks.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynShutdownHooks.java deleted file mode 100644 index 91ca5dc..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynShutdownHooks.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.entity.trait.Startable; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableSet; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; -import org.apache.brooklyn.util.javalang.Threads; -import org.apache.brooklyn.util.time.CountdownTimer; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; - -public class BrooklynShutdownHooks { - - private static final Logger log = LoggerFactory.getLogger(BrooklynShutdownHooks.class); - - private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration.TWO_MINUTES; - - private static final AtomicBoolean isShutdownHookRegistered = new AtomicBoolean(); - private static final List<Entity> entitiesToStopOnShutdown = Lists.newArrayList(); - private static final List<ManagementContext> managementContextsToStopAppsOnShutdown = Lists.newArrayList(); - private static final List<ManagementContext> managementContextsToTerminateOnShutdown = Lists.newArrayList(); - private static final AtomicBoolean isShutDown = new AtomicBoolean(false); - -// private static final Object mutex = new Object(); - private static final Semaphore semaphore = new Semaphore(1); - - /** - * Max time to wait for shutdown to complete, when stopping the entities from {@link #invokeStopOnShutdown(Entity)}. - * Default is two minutes - deliberately long because stopping cloud VMs can often take a minute. - */ - private static volatile Duration shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; - - public static void setShutdownTimeout(Duration val) { - shutdownTimeout = val; - } - - public static void invokeStopOnShutdown(Entity entity) { - if (!(entity instanceof Startable)) { - log.warn("Not adding entity {} for stop-on-shutdown as not an instance of {}", entity, Startable.class.getSimpleName()); - return; - } - try { - semaphore.acquire(); - if (isShutDown.get()) { - semaphore.release(); - try { - log.warn("Call to invokeStopOnShutdown for "+entity+" while system already shutting down; invoking stop now and throwing exception"); - Entities.destroy(entity); - throw new IllegalStateException("Call to invokeStopOnShutdown for "+entity+" while system already shutting down"); - } catch (Exception e) { - throw new IllegalStateException("Call to invokeStopOnShutdown for "+entity+" while system already shutting down, had error: "+e, e); - } - } - - try { - // TODO should be a weak reference in case it is destroyed before shutdown - // (only applied to certain entities started via launcher so not a big leak) - entitiesToStopOnShutdown.add(entity); - } finally { - semaphore.release(); - } - addShutdownHookIfNotAlready(); - - } catch (Exception e) { - throw Exceptions.propagate(e); - } - } - - public static void invokeStopAppsOnShutdown(ManagementContext managementContext) { - try { - semaphore.acquire(); - if (isShutDown.get()) { - semaphore.release(); - try { - log.warn("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down; invoking stop now and throwing exception"); - destroyAndWait(managementContext.getApplications(), shutdownTimeout); - - throw new IllegalStateException("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down"); - } catch (Exception e) { - throw new IllegalStateException("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down, had error: "+e, e); - } - } - - // TODO weak reference, as per above - managementContextsToStopAppsOnShutdown.add(managementContext); - semaphore.release(); - addShutdownHookIfNotAlready(); - - } catch (Exception e) { - throw Exceptions.propagate(e); - } - } - - public static void invokeTerminateOnShutdown(ManagementContext managementContext) { - try { - semaphore.acquire(); - if (isShutDown.get()) { - semaphore.release(); - try { - log.warn("Call to invokeStopOnShutdown for "+managementContext+" while system already shutting down; invoking stop now and throwing exception"); - ((ManagementContextInternal)managementContext).terminate(); - throw new IllegalStateException("Call to invokeTerminateOnShutdown for "+managementContext+" while system already shutting down"); - } catch (Exception e) { - throw new IllegalStateException("Call to invokeTerminateOnShutdown for "+managementContext+" while system already shutting down, had error: "+e, e); - } - } - - // TODO weak reference, as per above - managementContextsToTerminateOnShutdown.add(managementContext); - semaphore.release(); - addShutdownHookIfNotAlready(); - - } catch (Exception e) { - throw Exceptions.propagate(e); - } - } - - private static void addShutdownHookIfNotAlready() { - if (isShutdownHookRegistered.compareAndSet(false, true)) { - Threads.addShutdownHook(BrooklynShutdownHookJob.newInstanceForReal()); - } - } - - @VisibleForTesting - public static class BrooklynShutdownHookJob implements Runnable { - - final boolean setStaticShutDownFlag; - - private BrooklynShutdownHookJob(boolean setStaticShutDownFlag) { - this.setStaticShutDownFlag = setStaticShutDownFlag; - } - - public static BrooklynShutdownHookJob newInstanceForReal() { - return new BrooklynShutdownHookJob(true); - } - - /** testing instance does not actually set the `isShutDown` bit */ - public static BrooklynShutdownHookJob newInstanceForTesting() { - return new BrooklynShutdownHookJob(false); - } - - @Override - public void run() { - // First stop entities; on interrupt, abort waiting for tasks - but let shutdown hook continue - Set<Entity> entitiesToStop = MutableSet.of(); - try { - semaphore.acquire(); - if (setStaticShutDownFlag) - isShutDown.set(true); - semaphore.release(); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - entitiesToStop.addAll(entitiesToStopOnShutdown); - for (ManagementContext mgmt: managementContextsToStopAppsOnShutdown) { - if (mgmt.isRunning()) { - entitiesToStop.addAll(mgmt.getApplications()); - } - } - - if (entitiesToStop.isEmpty()) { - log.debug("Brooklyn shutdown: no entities to stop"); - } else { - log.info("Brooklyn shutdown: stopping entities "+entitiesToStop); - destroyAndWait(entitiesToStop, shutdownTimeout); - } - - // Then terminate management contexts - log.debug("Brooklyn terminateOnShutdown shutdown-hook invoked: terminating management contexts: "+managementContextsToTerminateOnShutdown); - for (ManagementContext managementContext: managementContextsToTerminateOnShutdown) { - try { - if (!managementContext.isRunning()) - continue; - ((ManagementContextInternal)managementContext).terminate(); - } catch (RuntimeException e) { - log.info("terminateOnShutdown of "+managementContext+" returned error (continuing): "+e, e); - } - } - } - } - - protected static void destroyAndWait(Iterable<? extends Entity> entitiesToStop, Duration timeout) { - MutableList<Task<?>> stops = MutableList.of(); - for (Entity entityToStop: entitiesToStop) { - final Entity entity = entityToStop; - if (!Entities.isManaged(entity)) continue; - Task<Object> t = Tasks.builder().dynamic(false).displayName("destroying "+entity).body(new Runnable() { - @Override public void run() { Entities.destroy(entity); } - }).build(); - stops.add( ((EntityInternal)entity).getExecutionContext().submit(t) ); - } - CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout); - for (Task<?> t: stops) { - try { - Duration durationRemaining = timer.getDurationRemaining(); - Object result = t.getUnchecked(durationRemaining.isPositive() ? durationRemaining : Duration.ONE_MILLISECOND); - if (log.isDebugEnabled()) log.debug("stopOnShutdown of {} completed: {}", t, result); - } catch (RuntimeInterruptedException e) { - Thread.currentThread().interrupt(); - if (log.isDebugEnabled()) log.debug("stopOnShutdown of "+t+" interrupted: "+e); - break; - } catch (RuntimeException e) { - Exceptions.propagateIfFatal(e); - log.warn("Shutdown hook "+t+" returned error (continuing): "+e); - if (log.isDebugEnabled()) log.debug("stopOnShutdown of "+t+" returned error (continuing to stop others): "+e, e); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CampYamlParser.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CampYamlParser.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CampYamlParser.java deleted file mode 100644 index 35841be..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CampYamlParser.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import java.util.Map; - -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; - -public interface CampYamlParser { - - ConfigKey<CampYamlParser> YAML_PARSER_KEY = ConfigKeys.newConfigKey(CampYamlParser.class, "brooklyn.camp.yamlParser"); - - Map<String, Object> parse(Map<String, Object> map); - - Object parse(String val); - -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CollectionChangeListener.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CollectionChangeListener.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CollectionChangeListener.java deleted file mode 100644 index 7aa700f..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CollectionChangeListener.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -public interface CollectionChangeListener<Item> { - void onItemAdded(Item item); - void onItemRemoved(Item item); -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/DeferredBrooklynProperties.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/DeferredBrooklynProperties.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/DeferredBrooklynProperties.java deleted file mode 100644 index ae0c7a5..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/DeferredBrooklynProperties.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.File; -import java.io.InputStream; -import java.net.URL; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; - -import org.apache.brooklyn.api.mgmt.ExecutionContext; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.config.ConfigKey.HasConfigKey; -import org.apache.brooklyn.core.internal.BrooklynProperties; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.flags.TypeCoercions; -import org.apache.brooklyn.util.core.task.DeferredSupplier; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.guava.Maybe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Predicate; -import com.google.common.collect.Maps; - -/** - * Delegates to another {@link BrooklynProperties} implementation, but intercepts all calls to get. - * The results are transformed: if they are in the external-config format then they are - * automatically converted to {@link DeferredSupplier}. - * - * The external-config format is that same as that for camp-yaml blueprints (i.e. - * {@code $brooklyn:external("myprovider", "mykey")}. - */ -public class DeferredBrooklynProperties implements BrooklynProperties { - - private static final Logger LOG = LoggerFactory.getLogger(DeferredBrooklynProperties.class); - - private static final String BROOKLYN_YAML_PREFIX = "$brooklyn:"; - - private final BrooklynProperties delegate; - private final ManagementContextInternal mgmt; - - public DeferredBrooklynProperties(BrooklynProperties delegate, ManagementContextInternal mgmt) { - this.delegate = checkNotNull(delegate, "delegate"); - this.mgmt = checkNotNull(mgmt, "mgmt"); - } - - private Object transform(ConfigKey<?> key, Object value) { - if (value instanceof CharSequence) { - String raw = value.toString(); - if (raw.startsWith(BROOKLYN_YAML_PREFIX)) { - CampYamlParser parser = mgmt.getConfig().getConfig(CampYamlParser.YAML_PARSER_KEY); - if (parser == null) { - // TODO Should we fail or return the untransformed value? - // Problem is this gets called during initialisation, e.g. by BrooklynFeatureEnablement calling asMapWithStringKeys() - // throw new IllegalStateException("Cannot parse external-config for "+key+" because no camp-yaml parser available"); - LOG.debug("Not transforming external-config {}, as no camp-yaml parser available", key); - return value; - } - return parser.parse(raw); - } - } - return value; - } - - private <T> T resolve(ConfigKey<T> key, Object value) { - Object transformed = transform(key, value); - - Object result; - if (transformed instanceof DeferredSupplier) { - ExecutionContext exec = mgmt.getServerExecutionContext(); - try { - result = Tasks.resolveValue(transformed, key.getType(), exec); - } catch (ExecutionException | InterruptedException e) { - throw Exceptions.propagate(e); - } - } else { - result = transformed; - } - - return TypeCoercions.coerce(result, key.getTypeToken()); - } - - @Override - public <T> T getConfig(ConfigKey<T> key) { - T raw = delegate.getConfig(key); - return resolve(key, raw); - } - - @Override - public <T> T getConfig(HasConfigKey<T> key) { - T raw = delegate.getConfig(key); - return resolve(key.getConfigKey(), raw); - } - - @Override - public <T> T getConfig(HasConfigKey<T> key, T defaultValue) { - T raw = delegate.getConfig(key, defaultValue); - return resolve(key.getConfigKey(), raw); - } - - @Override - public <T> T getConfig(ConfigKey<T> key, T defaultValue) { - T raw = delegate.getConfig(key, defaultValue); - return resolve(key, raw); - } - - @Deprecated - @Override - public Object getRawConfig(ConfigKey<?> key) { - return transform(key, delegate.getRawConfig(key)); - } - - @Override - public Maybe<Object> getConfigRaw(ConfigKey<?> key, boolean includeInherited) { - Maybe<Object> result = delegate.getConfigRaw(key, includeInherited); - return (result.isPresent()) ? Maybe.of(transform(key, result.get())) : Maybe.absent(); - } - - @Override - public Map<ConfigKey<?>, Object> getAllConfig() { - Map<ConfigKey<?>, Object> raw = delegate.getAllConfig(); - Map<ConfigKey<?>, Object> result = Maps.newLinkedHashMap(); - for (Map.Entry<ConfigKey<?>, Object> entry : raw.entrySet()) { - result.put(entry.getKey(), transform(entry.getKey(), entry.getValue())); - } - return result; - } - - @Override - public Map<String, Object> asMapWithStringKeys() { - Map<ConfigKey<?>, Object> raw = delegate.getAllConfig(); - Map<String, Object> result = Maps.newLinkedHashMap(); - for (Map.Entry<ConfigKey<?>, Object> entry : raw.entrySet()) { - result.put(entry.getKey().getName(), transform(entry.getKey(), entry.getValue())); - } - return result; - } - - /** - * Discouraged; returns the String so if it is external config, it will be the - * {@code $brooklyn:external(...)} format. - */ - @Override - @SuppressWarnings("rawtypes") - @Deprecated - public String get(Map flags, String key) { - return delegate.get(flags, key); - } - - /** - * Discouraged; returns the String so if it is external config, it will be the - * {@code $brooklyn:external(...)} format. - */ - @Override - public String getFirst(String ...keys) { - return delegate.getFirst(keys); - } - - /** - * Discouraged; returns the String so if it is external config, it will be the - * {@code $brooklyn:external(...)} format. - */ - @Override - @SuppressWarnings("rawtypes") - public String getFirst(Map flags, String ...keys) { - return delegate.getFirst(flags, keys); - } - - @Override - public BrooklynProperties submap(Predicate<ConfigKey<?>> filter) { - BrooklynProperties submap = delegate.submap(filter); - return new DeferredBrooklynProperties(submap, mgmt); - } - - @Override - public BrooklynProperties addEnvironmentVars() { - delegate.addEnvironmentVars(); - return this; - } - - @Override - public BrooklynProperties addSystemProperties() { - delegate.addSystemProperties(); - return this; - } - - @Override - public BrooklynProperties addFrom(ConfigBag cfg) { - delegate.addFrom(cfg); - return this; - } - - @Override - @SuppressWarnings("rawtypes") - public BrooklynProperties addFrom(Map map) { - delegate.addFrom(map); - return this; - } - - @Override - public BrooklynProperties addFrom(InputStream i) { - delegate.addFrom(i); - return this; - } - - @Override - public BrooklynProperties addFrom(File f) { - delegate.addFrom(f); - return this; - } - - @Override - public BrooklynProperties addFrom(URL u) { - delegate.addFrom(u); - return this; - } - - @Override - public BrooklynProperties addFromUrl(String url) { - delegate.addFromUrl(url); - return this; - } - - @Override - public BrooklynProperties addFromUrlProperty(String urlProperty) { - delegate.addFromUrlProperty(urlProperty); - return this; - } - - @Override - @SuppressWarnings("rawtypes") - public BrooklynProperties addFromMap(Map properties) { - delegate.addFromMap(properties); - return this; - } - - @Override - public boolean putIfAbsent(String key, Object value) { - return delegate.putIfAbsent(key, value); - } - - @Override - public String toString() { - return delegate.toString(); - } - - @Override - public Object put(Object key, Object value) { - return delegate.put(key, value); - } - - @Override - @SuppressWarnings("rawtypes") - public void putAll(Map vals) { - delegate.putAll(vals); - } - - @Override - public <T> Object put(HasConfigKey<T> key, T value) { - return delegate.put(key, value); - } - - @Override - public <T> Object put(ConfigKey<T> key, T value) { - return delegate.put(key, value); - } - - @Override - public <T> boolean putIfAbsent(ConfigKey<T> key, T value) { - return delegate.putIfAbsent(key, value); - } - - - ////////////////////////////////////////////////////////////////////////////////// - // Methods below from java.util.LinkedHashMap, which BrooklynProperties extends // - ////////////////////////////////////////////////////////////////////////////////// - - @Override - public int size() { - return delegate.size(); - } - - @Override - public boolean isEmpty() { - return delegate.isEmpty(); - } - - @Override - public boolean containsKey(Object key) { - return delegate.containsKey(key); - } - - @Override - public boolean containsValue(Object value) { - return delegate.containsValue(value); - } - - @Override - public Object get(Object key) { - return delegate.get(key); - } - - @Override - public Object remove(Object key) { - return delegate.remove(key); - } - - @Override - public void clear() { - delegate.clear(); - } - - @Override - @SuppressWarnings("rawtypes") - public Set keySet() { - return delegate.keySet(); - } - - @Override - @SuppressWarnings("rawtypes") - public Collection values() { - return delegate.values(); - } - - @Override - @SuppressWarnings({ "unchecked", "rawtypes" }) - public Set<Map.Entry> entrySet() { - return delegate.entrySet(); - } - - @Override - public boolean equals(Object o) { - return delegate.equals(o); - } - - @Override - public int hashCode() { - return delegate.hashCode(); - } - - // put(Object, Object) already overridden - //@Override - //public Object put(Object key, Object value) { - - // putAll(Map) already overridden - //@Override - //public void putAll(Map m) { -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EffectorUtils.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EffectorUtils.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EffectorUtils.java deleted file mode 100644 index f8bb7cb..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EffectorUtils.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import static org.apache.brooklyn.util.groovy.GroovyJavaMethods.truth; - -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.brooklyn.api.effector.Effector; -import org.apache.brooklyn.api.effector.ParameterType; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.core.effector.BasicParameterType; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.flags.TypeCoercions; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.exceptions.PropagatedRuntimeException; -import org.apache.brooklyn.util.guava.Maybe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Utility methods for invoking effectors. - */ -public class EffectorUtils { - - private static final Logger log = LoggerFactory.getLogger(EffectorUtils.class); - - /** prepares arguments for an effector either accepting: - * an array, which should contain the arguments in order, optionally omitting those which have defaults defined; - * or a map, which should contain the arguments by name, again optionally omitting those which have defaults defined, - * and in this case also performing type coercion. - */ - public static Object[] prepareArgsForEffector(Effector<?> eff, Object args) { - if (args != null && args.getClass().isArray()) { - return prepareArgsForEffectorFromArray(eff, (Object[]) args); - } - if (args instanceof Map) { - return prepareArgsForEffectorFromMap(eff, (Map) args); - } - log.warn("Deprecated effector invocation style for call to "+eff+", expecting a map or an array, got: "+args); - if (log.isDebugEnabled()) { - log.debug("Deprecated effector invocation style for call to "+eff+", expecting a map or an array, got: "+args, - new Throwable("Trace for deprecated effector invocation style")); - } - return oldPrepareArgsForEffector(eff, args); - } - - /** method used for calls such as entity.effector(arg1, arg2) - * get routed here from AbstractEntity.invokeMethod */ - private static Object[] prepareArgsForEffectorFromArray(Effector<?> eff, Object args[]) { - int newArgsNeeded = eff.getParameters().size(); - if (args.length==1 && args[0] instanceof Map) { - if (newArgsNeeded!=1 || !eff.getParameters().get(0).getParameterClass().isAssignableFrom(args[0].getClass())) { - // treat a map in an array as a map passed directly (unless the method takes a single-arg map) - // this is to support effector(param1: val1) - return prepareArgsForEffectorFromMap(eff, (Map) args[0]); - } - } - return prepareArgsForEffectorAsMapFromArray(eff, args).values().toArray(new Object[0]); - } - - public static Map prepareArgsForEffectorAsMapFromArray(Effector<?> eff, Object args[]) { - int newArgsNeeded = eff.getParameters().size(); - List l = Lists.newArrayList(); - l.addAll(Arrays.asList(args)); - Map newArgs = new LinkedHashMap(); - - for (int index = 0; index < eff.getParameters().size(); index++) { - ParameterType<?> it = eff.getParameters().get(index); - - if (l.size() >= newArgsNeeded) { - //all supplied (unnamed) arguments must be used; ignore map - newArgs.put(it.getName(), l.remove(0)); - // TODO do we ignore arguments in the same order that groovy does? - } else if (!l.isEmpty() && it.getParameterClass().isInstance(l.get(0))) { - //if there are parameters supplied, and type is correct, they get applied before default values - //(this is akin to groovy) - newArgs.put(it.getName(), l.remove(0)); - } else if (it instanceof BasicParameterType && ((BasicParameterType)it).hasDefaultValue()) { - //finally, default values are used to make up for missing parameters - newArgs.put(it.getName(), ((BasicParameterType)it).getDefaultValue()); - } else { - throw new IllegalArgumentException("Invalid arguments (count mismatch) for effector "+eff+": "+args); - } - - newArgsNeeded--; - } - if (newArgsNeeded > 0) { - throw new IllegalArgumentException("Invalid arguments (missing "+newArgsNeeded+") for effector "+eff+": "+args); - } - if (!l.isEmpty()) { - throw new IllegalArgumentException("Invalid arguments ("+l.size()+" extra) for effector "+eff+": "+args); - } - return newArgs; - } - - private static Object[] prepareArgsForEffectorFromMap(Effector<?> eff, Map m) { - m = Maps.newLinkedHashMap(m); //make editable copy - List newArgs = Lists.newArrayList(); - int newArgsNeeded = eff.getParameters().size(); - - for (int index = 0; index < eff.getParameters().size(); index++) { - ParameterType<?> it = eff.getParameters().get(index); - Object v; - if (truth(it.getName()) && m.containsKey(it.getName())) { - // argument is in the map - v = m.remove(it.getName()); - } else if (it instanceof BasicParameterType && ((BasicParameterType)it).hasDefaultValue()) { - //finally, default values are used to make up for missing parameters - v = ((BasicParameterType)it).getDefaultValue(); - } else { - throw new IllegalArgumentException("Invalid arguments (missing argument "+it+") for effector "+eff+": "+m); - } - - newArgs.add(TypeCoercions.coerce(v, it.getParameterClass())); - newArgsNeeded--; - } - if (newArgsNeeded>0) - throw new IllegalArgumentException("Invalid arguments (missing "+newArgsNeeded+") for effector "+eff+": "+m); - if (!m.isEmpty()) { - log.warn("Unsupported parameter to "+eff+" (ignoring): "+m); - } - return newArgs.toArray(new Object[newArgs.size()]); - } - - /** - * Takes arguments, and returns an array of arguments suitable for use by the Effector - * according to the ParameterTypes it exposes. - * <p> - * The args can be: - * <ol> - * <li>an array of ordered arguments - * <li>a collection (which will be automatically converted to an array) - * <li>a single argument (which will then be wrapped in an array) - * <li>a map containing the (named) arguments - * <li>an array or collection single entry of a map (treated same as 5 above) - * <li>a semi-populated array or collection that also containing a map as first arg - - * uses ordered args in array, but uses named values from map in preference. - * <li>semi-populated array or collection, where default values will otherwise be used. - * </ol> - */ - public static Object[] oldPrepareArgsForEffector(Effector<?> eff, Object args) { - //attempt to coerce unexpected types - Object[] argsArray; - if (args==null) { - argsArray = new Object[0]; - } else if (args.getClass().isArray()) { - argsArray = (Object[]) args; - } else { - if (args instanceof Collection) { - argsArray = ((Collection) args).toArray(new Object[((Collection) args).size()]); - } else { - argsArray = new Object[] { args }; - } - } - - //if args starts with a map, assume it contains the named arguments - //(but only use it when we have insufficient supplied arguments) - List l = Lists.newArrayList(); - l.addAll(Arrays.asList(argsArray)); - Map m = (argsArray.length > 0 && argsArray[0] instanceof Map ? Maps.newLinkedHashMap((Map) l.remove(0)) : null); - List newArgs = Lists.newArrayList(); - int newArgsNeeded = eff.getParameters().size(); - boolean mapUsed = false; - - for (int index = 0; index < eff.getParameters().size(); index++) { - ParameterType<?> it = eff.getParameters().get(index); - - if (l.size() >= newArgsNeeded) { - //all supplied (unnamed) arguments must be used; ignore map - newArgs.add(l.remove(0)); - } else if (truth(m) && truth(it.getName()) && m.containsKey(it.getName())) { - //some arguments were not supplied, and this one is in the map - newArgs.add(m.remove(it.getName())); - } else if (index == 0 && Map.class.isAssignableFrom(it.getParameterClass())) { - //if first arg is a map it takes the supplied map - newArgs.add(m); - mapUsed = true; - } else if (!l.isEmpty() && it.getParameterClass().isInstance(l.get(0))) { - //if there are parameters supplied, and type is correct, they get applied before default values - //(this is akin to groovy) - newArgs.add(l.remove(0)); - } else if (it instanceof BasicParameterType && ((BasicParameterType)it).hasDefaultValue()) { - //finally, default values are used to make up for missing parameters - newArgs.add(((BasicParameterType)it).getDefaultValue()); - } else { - throw new IllegalArgumentException("Invalid arguments (count mismatch) for effector "+eff+": "+args); - } - - newArgsNeeded--; - } - if (newArgsNeeded > 0) { - throw new IllegalArgumentException("Invalid arguments (missing "+newArgsNeeded+") for effector "+eff+": "+args); - } - if (!l.isEmpty()) { - throw new IllegalArgumentException("Invalid arguments ("+l.size()+" extra) for effector "+eff+": "+args); - } - if (truth(m) && !mapUsed) { - throw new IllegalArgumentException("Invalid arguments ("+m.size()+" extra named) for effector "+eff+": "+args); - } - return newArgs.toArray(new Object[newArgs.size()]); - } - - /** - * Invokes a method effector so that its progress is tracked. For internal use only, when we know the effector is backed by a method which is local. - */ - public static <T> T invokeMethodEffector(Entity entity, Effector<T> eff, Object[] args) { - String name = eff.getName(); - - try { - if (log.isDebugEnabled()) log.debug("Invoking effector {} on {}", new Object[] {name, entity}); - if (log.isTraceEnabled()) log.trace("Invoking effector {} on {} with args {}", new Object[] {name, entity, args}); - EntityManagementSupport mgmtSupport = ((EntityInternal)entity).getManagementSupport(); - if (!mgmtSupport.isDeployed()) { - mgmtSupport.attemptLegacyAutodeployment(name); - } - ManagementContextInternal mgmtContext = (ManagementContextInternal) ((EntityInternal) entity).getManagementContext(); - - mgmtSupport.getEntityChangeListener().onEffectorStarting(eff, args); - try { - return mgmtContext.invokeEffectorMethodSync(entity, eff, args); - } finally { - mgmtSupport.getEntityChangeListener().onEffectorCompleted(eff); - } - } catch (Exception e) { - handleEffectorException(entity, eff, e); - // (won't return below) - return null; - } - } - - public static void handleEffectorException(Entity entity, Effector<?> effector, Throwable throwable) { - String message = "Error invoking " + effector.getName() + " at " + entity; - // Avoid throwing a PropagatedRuntimeException that just repeats the last PropagatedRuntimeException. - if (throwable instanceof PropagatedRuntimeException && - throwable.getMessage() != null && - throwable.getMessage().startsWith(message)) { - throw PropagatedRuntimeException.class.cast(throwable); - } else { - log.warn(message + ": " + Exceptions.collapseText(throwable)); - throw new PropagatedRuntimeException(message, throwable); - } - } - - public static <T> Task<T> invokeEffectorAsync(Entity entity, Effector<T> eff, Map<String,?> parameters) { - String name = eff.getName(); - - if (log.isDebugEnabled()) log.debug("Invoking-async effector {} on {}", new Object[] { name, entity }); - if (log.isTraceEnabled()) log.trace("Invoking-async effector {} on {} with args {}", new Object[] { name, entity, parameters }); - EntityManagementSupport mgmtSupport = ((EntityInternal)entity).getManagementSupport(); - if (!mgmtSupport.isDeployed()) { - mgmtSupport.attemptLegacyAutodeployment(name); - } - ManagementContextInternal mgmtContext = (ManagementContextInternal) ((EntityInternal)entity).getManagementContext(); - - // FIXME seems brittle to have the listeners in the Utils method; better to move into the context.invokeEff - // (or whatever the last mile before invoking the effector is - though currently there is not such a canonical place!) - mgmtSupport.getEntityChangeListener().onEffectorStarting(eff, parameters); - try { - return mgmtContext.invokeEffector(entity, eff, parameters); - } finally { - // FIXME this is really Effector submitted - mgmtSupport.getEntityChangeListener().onEffectorCompleted(eff); - } - } - - /** @deprecated since 0.7.0, not used */ - @Deprecated - public static Effector<?> findEffectorMatching(Entity entity, Method method) { - outer: for (Effector<?> effector : entity.getEntityType().getEffectors()) { - if (!effector.getName().equals(entity)) continue; - if (effector.getParameters().size() != method.getParameterTypes().length) continue; - for (int i = 0; i < effector.getParameters().size(); i++) { - if (effector.getParameters().get(i).getParameterClass() != method.getParameterTypes()[i]) continue outer; - } - return effector; - } - return null; - } - - /** @deprecated since 0.7.0, expects parameters but does not use them! */ - @Deprecated - public static Effector<?> findEffectorMatching(Set<Effector<?>> effectors, String effectorName, Map<String, ?> parameters) { - // TODO Support overloading: check parameters as well - for (Effector<?> effector : effectors) { - if (effector.getName().equals(effectorName)) { - return effector; - } - } - return null; - } - - /** matches effectors by name only (not parameters) */ - public static Maybe<Effector<?>> findEffector(Collection<? extends Effector<?>> effectors, String effectorName) { - for (Effector<?> effector : effectors) { - if (effector.getName().equals(effectorName)) { - return Maybe.<Effector<?>>of(effector); - } - } - return Maybe.absent(new NoSuchElementException("No effector with name "+effectorName+" (contenders "+effectors+")")); - } - - /** matches effectors by name only (not parameters), based on what is declared on the entity static type */ - public static Maybe<Effector<?>> findEffectorDeclared(Entity entity, String effectorName) { - return findEffector(entity.getEntityType().getEffectors(), effectorName); - } - - /** @deprecated since 0.7.0 use {@link #getTaskFlagsForEffectorInvocation(Entity, Effector, ConfigBag)} */ - public static Map<Object,Object> getTaskFlagsForEffectorInvocation(Entity entity, Effector<?> effector) { - return getTaskFlagsForEffectorInvocation(entity, effector, null); - } - - /** returns a (mutable) map of the standard flags which should be placed on an effector */ - public static Map<Object,Object> getTaskFlagsForEffectorInvocation(Entity entity, Effector<?> effector, ConfigBag parameters) { - List<Object> tags = MutableList.of( - BrooklynTaskTags.EFFECTOR_TAG, - BrooklynTaskTags.tagForEffectorCall(entity, effector.getName(), parameters), - BrooklynTaskTags.tagForTargetEntity(entity)); - if (Entitlements.getEntitlementContext() != null) { - tags.add(BrooklynTaskTags.tagForEntitlement(Entitlements.getEntitlementContext())); - } - return MutableMap.builder() - .put("description", "Invoking effector "+effector.getName() - +" on "+entity.getDisplayName() - +(parameters!=null ? " with parameters "+parameters.getAllConfig() : "")) - .put("displayName", effector.getName()) - .put("tags", tags) - .build(); - } - -}
