http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/LocalEntityManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/LocalEntityManager.java b/core/src/main/java/brooklyn/management/internal/LocalEntityManager.java deleted file mode 100644 index a54c1b1..0000000 --- a/core/src/main/java/brooklyn/management/internal/LocalEntityManager.java +++ /dev/null @@ -1,818 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import static com.google.common.base.Preconditions.checkNotNull; -import groovy.util.ObservableList; - -import java.lang.reflect.Proxy; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.entity.Application; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.Group; -import org.apache.brooklyn.api.entity.proxying.EntitySpec; -import org.apache.brooklyn.api.entity.proxying.EntityTypeRegistry; -import org.apache.brooklyn.api.management.AccessController; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.policy.Enricher; -import org.apache.brooklyn.api.policy.EnricherSpec; -import org.apache.brooklyn.api.policy.Policy; -import org.apache.brooklyn.api.policy.PolicySpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.BrooklynLogging; -import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.basic.BrooklynTaskTags; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.basic.EntityPredicates; -import brooklyn.entity.proxying.BasicEntityTypeRegistry; -import brooklyn.entity.proxying.EntityProxy; -import brooklyn.entity.proxying.EntityProxyImpl; -import brooklyn.entity.proxying.InternalEntityFactory; -import brooklyn.entity.proxying.InternalPolicyFactory; -import brooklyn.entity.trait.Startable; -import brooklyn.internal.storage.BrooklynStorage; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.collections.SetFromLiveMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.task.Tasks; -import brooklyn.util.time.CountdownTimer; -import brooklyn.util.time.Duration; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -public class LocalEntityManager implements EntityManagerInternal { - - private static final Logger log = LoggerFactory.getLogger(LocalEntityManager.class); - - private final LocalManagementContext managementContext; - private final BasicEntityTypeRegistry entityTypeRegistry; - private final InternalEntityFactory entityFactory; - private final InternalPolicyFactory policyFactory; - - /** Entities that have been created, but have not yet begun to be managed */ - protected final Map<String,Entity> preRegisteredEntitiesById = Collections.synchronizedMap(new WeakHashMap<String, Entity>()); - - /** Entities that are in the process of being managed, but where management is not yet complete */ - protected final Map<String,Entity> preManagedEntitiesById = Collections.synchronizedMap(new WeakHashMap<String, Entity>()); - - /** Proxies of the managed entities */ - protected final ConcurrentMap<String,Entity> entityProxiesById = Maps.newConcurrentMap(); - - /** Real managed entities */ - protected final Map<String,Entity> entitiesById = Maps.newLinkedHashMap(); - - /** Management mode for each entity */ - protected final Map<String,ManagementTransitionMode> entityModesById = Collections.synchronizedMap(Maps.<String,ManagementTransitionMode>newLinkedHashMap()); - - /** Proxies of the managed entities */ - protected final ObservableList entities = new ObservableList(); - - /** Proxies of the managed entities that are applications */ - protected final Set<Application> applications = Sets.newConcurrentHashSet(); - - private final BrooklynStorage storage; - private final Map<String,String> entityTypes; - private final Set<String> applicationIds; - - public LocalEntityManager(LocalManagementContext managementContext) { - this.managementContext = checkNotNull(managementContext, "managementContext"); - this.storage = managementContext.getStorage(); - this.entityTypeRegistry = new BasicEntityTypeRegistry(); - this.policyFactory = new InternalPolicyFactory(managementContext); - this.entityFactory = new InternalEntityFactory(managementContext, entityTypeRegistry, policyFactory); - - entityTypes = storage.getMap("entities"); - applicationIds = SetFromLiveMap.create(storage.<String,Boolean>getMap("applications")); - } - - public InternalEntityFactory getEntityFactory() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return entityFactory; - } - - public InternalPolicyFactory getPolicyFactory() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return policyFactory; - } - - @Override - public EntityTypeRegistry getEntityTypeRegistry() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return entityTypeRegistry; - } - - @SuppressWarnings("unchecked") - @Override - public <T extends Entity> T createEntity(EntitySpec<T> spec) { - try { - T entity = entityFactory.createEntity(spec); - Entity proxy = ((AbstractEntity)entity).getProxy(); - return (T) checkNotNull(proxy, "proxy for entity %s, spec %s", entity, spec); - } catch (Throwable e) { - log.warn("Failed to create entity using spec "+spec+" (rethrowing)", e); - throw Exceptions.propagate(e); - } - } - - @Override - public <T extends Entity> T createEntity(Map<?,?> config, Class<T> type) { - return createEntity(EntitySpec.create(config, type)); - } - - @Override - public <T extends Policy> T createPolicy(PolicySpec<T> spec) { - try { - return policyFactory.createPolicy(spec); - } catch (Throwable e) { - log.warn("Failed to create policy using spec "+spec+" (rethrowing)", e); - throw Exceptions.propagate(e); - } - } - - @Override - public <T extends Enricher> T createEnricher(EnricherSpec<T> spec) { - try { - return policyFactory.createEnricher(spec); - } catch (Throwable e) { - log.warn("Failed to create enricher using spec "+spec+" (rethrowing)", e); - throw Exceptions.propagate(e); - } - } - - @Override - public Collection<Entity> getEntities() { - return ImmutableList.copyOf(entityProxiesById.values()); - } - - @Override - public Collection<String> getEntityIds() { - return ImmutableList.copyOf(entityProxiesById.keySet()); - } - - @Override - public Collection<Entity> getEntitiesInApplication(Application application) { - Predicate<Entity> predicate = EntityPredicates.applicationIdEqualTo(application.getId()); - return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), predicate)); - } - - @Override - public Collection<Entity> findEntities(Predicate<? super Entity> filter) { - return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), filter)); - } - - @Override - public Collection<Entity> findEntitiesInApplication(Application application, Predicate<? super Entity> filter) { - Predicate<Entity> predicate = Predicates.and(EntityPredicates.applicationIdEqualTo(application.getId()), filter); - return ImmutableList.copyOf(Iterables.filter(entityProxiesById.values(), predicate)); - } - - @Override - public Iterable<Entity> getAllEntitiesInApplication(Application application) { - Predicate<Entity> predicate = EntityPredicates.applicationIdEqualTo(application.getId()); - Iterable<Entity> allentities = Iterables.concat(preRegisteredEntitiesById.values(), preManagedEntitiesById.values(), entityProxiesById.values()); - Iterable<Entity> result = Iterables.filter(allentities, predicate); - return ImmutableSet.copyOf(Iterables.transform(result, new Function<Entity, Entity>() { - @Override public Entity apply(Entity input) { - return Entities.proxy(input); - }})); - } - - @Override - public Entity getEntity(String id) { - return entityProxiesById.get(id); - } - - Collection<Application> getApplications() { - return ImmutableList.copyOf(applications); - } - - @Override - public boolean isManaged(Entity e) { - return (isRunning() && getEntity(e.getId()) != null); - } - - boolean isPreRegistered(Entity e) { - return preRegisteredEntitiesById.containsKey(e.getId()); - } - - void prePreManage(Entity entity) { - if (isPreRegistered(entity)) { - log.warn(""+this+" redundant call to pre-pre-manage entity "+entity+"; skipping", - new Exception("source of duplicate pre-pre-manage of "+entity)); - return; - } - preRegisteredEntitiesById.put(entity.getId(), entity); - } - - @Override - public ManagementTransitionMode getLastManagementTransitionMode(String itemId) { - return entityModesById.get(itemId); - } - - @Override - public void setManagementTransitionMode(Entity item, ManagementTransitionMode mode) { - entityModesById.put(item.getId(), mode); - } - - // TODO synchronization issues here. We guard with isManaged(), but if another thread executing - // concurrently then the managed'ness could be set after our check but before we do - // onManagementStarting etc. However, we can't just synchronize because we're calling alien code - // (the user might override entity.onManagementStarting etc). - // - // TODO We need to do some check about isPreManaged - i.e. is there another thread (or is this a - // re-entrant call) where the entity is not yet full managed (i.e. isManaged==false) but we're in - // the middle of managing it. - // - // TODO Also see LocalLocationManager.manage(Entity), if fixing things here - @Override - public void manage(Entity e) { - if (isManaged(e)) { - log.warn(""+this+" redundant call to start management of entity (and descendants of) "+e+"; skipping", - new Exception("source of duplicate management of "+e)); - return; - } - manageRecursive(e, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.NONEXISTENT, BrooklynObjectManagementMode.MANAGED_PRIMARY)); - } - - @Override - public void manageRebindedRoot(Entity item) { - ManagementTransitionMode mode = getLastManagementTransitionMode(item.getId()); - Preconditions.checkNotNull(mode, "Mode not set for rebinding %s", item); - manageRecursive(item, mode); - } - - protected void checkManagementAllowed(Entity item) { - AccessController.Response access = managementContext.getAccessController().canManageEntity(item); - if (!access.isAllowed()) { - throw new IllegalStateException("Access controller forbids management of "+item+": "+access.getMsg()); - } - } - - /* TODO we sloppily use "recursive" to ensure ordering of parent-first in many places - * (which may not be necessary but seems like a good idea), - * and also to collect many entities when doing a big rebind, - * ensuring all have #manageNonRecursive called before calling #onManagementStarted. - * - * it would be better to have a manageAll(Map<Entity,ManagementTransitionMode> items) - * method which did that in two phases, allowing us to selectively rebind, - * esp when we come to want supporting different modes and different brooklyn nodes. - * - * the impl of manageAll could sort them with parents before children, - * (and manageRecursive could simply populate a map and delegate to manageAll). - * - * manageRebindRoot would then go, and the (few) callers would construct the map. - * - * similarly we might want an unmanageAll(), - * although possibly all unmanagement should be recursive, if we assume an entity's ancestors are always at least proxied - * (and the non-recursive RO path here could maybe be dropped) - */ - - /** Applies management lifecycle callbacks (onManagementStarting, for all beforehand, then onManagementStopped, for all after) */ - protected void manageRecursive(Entity e, final ManagementTransitionMode initialMode) { - checkManagementAllowed(e); - - final List<EntityInternal> allEntities = Lists.newArrayList(); - Predicate<EntityInternal> manageEntity = new Predicate<EntityInternal>() { public boolean apply(EntityInternal it) { - ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); - if (mode==null) { - setManagementTransitionMode(it, mode = initialMode); - } - - Boolean isReadOnlyFromEntity = it.getManagementSupport().isReadOnlyRaw(); - if (isReadOnlyFromEntity==null) { - if (mode.isReadOnly()) { - // should have been marked by rebinder - log.warn("Read-only entity "+it+" not marked as such on call to manage; marking and continuing"); - } - it.getManagementSupport().setReadOnly(mode.isReadOnly()); - } else { - if (!isReadOnlyFromEntity.equals(mode.isReadOnly())) { - log.warn("Read-only status at entity "+it+" ("+isReadOnlyFromEntity+") not consistent with management mode "+mode); - } - } - - if (it.getManagementSupport().isDeployed()) { - if (mode.wasNotLoaded()) { - // silently bail out - return false; - } else { - if (mode.wasPrimary() && mode.isPrimary()) { - // active partial rebind; continue - } else if (mode.wasReadOnly() && mode.isReadOnly()) { - // reload in RO mode - } else { - // on initial non-RO rebind, should not have any deployed instances - log.warn("Already deployed "+it+" when managing "+mode+"/"+initialMode+"; ignoring this and all descendants"); - return false; - } - } - } - - // check RO status is consistent - boolean isNowReadOnly = Boolean.TRUE.equals( ((EntityInternal)it).getManagementSupport().isReadOnly() ); - if (mode.isReadOnly()!=isNowReadOnly) { - throw new IllegalStateException("Read-only status mismatch for "+it+": "+mode+" / RO="+isNowReadOnly); - } - - allEntities.add(it); - preManageNonRecursive(it, mode); - it.getManagementSupport().onManagementStarting( new ManagementTransitionInfo(managementContext, mode) ); - return manageNonRecursive(it, mode); - } }; - boolean isRecursive = true; - if (initialMode.wasPrimary() && initialMode.isPrimary()) { - // already managed, so this shouldn't be recursive - // (in ActivePartialRebind we cheat, calling in to this method then skipping recursion). - // it also falls through to here when doing a redundant promotion, - // in that case we *should* be recursive; determine by checking whether a child exists and is preregistered. - // the TODO above removing manageRebindRoot in favour of explicit mgmt list would clean this up a lot! - Entity aChild = Iterables.getFirst(e.getChildren(), null); - if (aChild!=null && isPreRegistered(aChild)) { - log.debug("Managing "+e+" in mode "+initialMode+", doing this recursively because a child is preregistered"); - } else { - log.debug("Managing "+e+" but skipping recursion, as mode is "+initialMode); - isRecursive = false; - } - } - if (!isRecursive) { - manageEntity.apply( (EntityInternal)e ); - } else { - recursively(e, manageEntity); - } - - for (EntityInternal it : allEntities) { - if (!it.getManagementSupport().isFullyManaged()) { - ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); - ManagementTransitionInfo info = new ManagementTransitionInfo(managementContext, mode); - - it.getManagementSupport().onManagementStarted(info); - managementContext.getRebindManager().getChangeListener().onManaged(it); - } - } - } - - @Override - public void unmanage(final Entity e) { - unmanage(e, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT)); - } - - public void unmanage(final Entity e, final ManagementTransitionMode mode) { - unmanage(e, mode, false); - } - - private void unmanage(final Entity e, ManagementTransitionMode mode, boolean hasBeenReplaced) { - if (shouldSkipUnmanagement(e)) return; - final ManagementTransitionInfo info = new ManagementTransitionInfo(managementContext, mode); - - if (hasBeenReplaced) { - // we are unmanaging an old instance after having replaced it - // don't unmanage or even clear its fields, because there might be references to it - - if (mode.wasReadOnly()) { - // if coming *from* read only; nothing needed - } else { - if (!mode.wasPrimary()) { - log.warn("Unexpected mode "+mode+" for unmanage-replace "+e+" (applying anyway)"); - } - // migrating away or in-place active partial rebind: - ((EntityInternal)e).getManagementSupport().onManagementStopping(info); - stopTasks(e); - ((EntityInternal)e).getManagementSupport().onManagementStopped(info); - } - // do not remove from maps below, bail out now - return; - - } else if (mode.wasReadOnly() && mode.isNoLongerLoaded()) { - // we are unmanaging an instance (secondary); either stopping here or primary destroyed elsewhere - ((EntityInternal)e).getManagementSupport().onManagementStopping(info); - unmanageNonRecursive(e); - stopTasks(e); - ((EntityInternal)e).getManagementSupport().onManagementStopped(info); - managementContext.getRebindManager().getChangeListener().onUnmanaged(e); - if (managementContext.getGarbageCollector() != null) managementContext.getGarbageCollector().onUnmanaged(e); - - } else if (mode.wasPrimary() && mode.isNoLongerLoaded()) { - // unmanaging a primary; currently this is done recursively - - /* TODO tidy up when it is recursive and when it isn't; if something is being unloaded or destroyed, - * that probably *is* recursive, but the old mode might be different if in some cases things are read-only. - * or maybe nothing needs to be recursive, we just make sure the callers (e.g. HighAvailabilityModeImpl.clearManagedItems) - * call in a good order - * - * see notes above about recursive/manage/All/unmanageAll - */ - - // Need to store all child entities as onManagementStopping removes a child from the parent entity - final List<EntityInternal> allEntities = Lists.newArrayList(); - recursively(e, new Predicate<EntityInternal>() { public boolean apply(EntityInternal it) { - if (shouldSkipUnmanagement(it)) return false; - allEntities.add(it); - it.getManagementSupport().onManagementStopping(info); - return true; - } }); - - for (EntityInternal it : allEntities) { - if (shouldSkipUnmanagement(it)) continue; - unmanageNonRecursive(it); - stopTasks(it); - } - for (EntityInternal it : allEntities) { - it.getManagementSupport().onManagementStopped(info); - managementContext.getRebindManager().getChangeListener().onUnmanaged(it); - if (managementContext.getGarbageCollector() != null) managementContext.getGarbageCollector().onUnmanaged(e); - } - - } else { - log.warn("Invalid mode for unmanage: "+mode+" on "+e+" (ignoring)"); - } - - preRegisteredEntitiesById.remove(e.getId()); - preManagedEntitiesById.remove(e.getId()); - entityProxiesById.remove(e.getId()); - entitiesById.remove(e.getId()); - entityModesById.remove(e.getId()); - } - - private void stopTasks(Entity entity) { - stopTasks(entity, null); - } - - /** stops all tasks (apart from any current one or its descendants) on this entity, - * optionally -- if a timeout is given -- waiting for completion and warning on incomplete tasks */ - @Beta - public void stopTasks(Entity entity, @Nullable Duration timeout) { - CountdownTimer timeleft = timeout==null ? null : timeout.countdownTimer(); - // try forcibly interrupting tasks on managed entities - Collection<Exception> exceptions = MutableSet.of(); - try { - Set<Task<?>> tasksCancelled = MutableSet.of(); - for (Task<?> t: managementContext.getExecutionContext(entity).getTasks()) { - if (entity.equals(BrooklynTaskTags.getContextEntity(Tasks.current())) && hasTaskAsAncestor(t, Tasks.current())) { - // don't cancel if we are running inside a task on the target entity and - // the task being considered is one we have submitted -- e.g. on "stop" don't cancel ourselves! - // but if our current task is from another entity we probably do want to cancel them (we are probably invoking unmanage) - continue; - } - - if (!t.isDone()) { - try { - log.debug("Cancelling "+t+" on "+entity); - tasksCancelled.add(t); - t.cancel(true); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.debug("Error cancelling "+t+" on "+entity+" (will warn when all tasks are cancelled): "+e, e); - exceptions.add(e); - } - } - } - - if (timeleft!=null) { - Set<Task<?>> tasksIncomplete = MutableSet.of(); - // go through all tasks, not just cancelled ones, in case there are previously cancelled ones which are not complete - for (Task<?> t: managementContext.getExecutionContext(entity).getTasks()) { - if (hasTaskAsAncestor(t, Tasks.current())) - continue; - if (!Tasks.blockUntilInternalTasksEnded(t, timeleft.getDurationRemaining())) { - tasksIncomplete.add(t); - } - } - if (!tasksIncomplete.isEmpty()) { - log.warn("Incomplete tasks when stopping "+entity+": "+tasksIncomplete); - } - if (log.isTraceEnabled()) - log.trace("Cancelled "+tasksCancelled+" tasks for "+entity+", with "+ - timeleft.getDurationRemaining()+" remaining (of "+timeout+"): "+tasksCancelled); - } else { - if (log.isTraceEnabled()) - log.trace("Cancelled "+tasksCancelled+" tasks for "+entity+": "+tasksCancelled); - } - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.warn("Error inspecting tasks to cancel on unmanagement: "+e, e); - } - if (!exceptions.isEmpty()) - log.warn("Error when cancelling tasks for "+entity+" on unmanagement: "+Exceptions.create(exceptions)); - } - - private boolean hasTaskAsAncestor(Task<?> t, Task<?> potentialAncestor) { - if (t==null || potentialAncestor==null) return false; - if (t.equals(potentialAncestor)) return true; - return hasTaskAsAncestor(t.getSubmittedByTask(), potentialAncestor); - } - - /** - * activates management when effector invoked, warning unless context is acceptable - * (currently only acceptable context is "start") - */ - void manageIfNecessary(Entity entity, Object context) { - if (!isRunning()) { - return; // TODO Still a race for terminate being called, and then isManaged below returning false - } else if (((EntityInternal)entity).getManagementSupport().wasDeployed()) { - return; - } else if (isManaged(entity)) { - return; - } else if (isPreManaged(entity)) { - return; - } else if (Boolean.TRUE.equals(((EntityInternal)entity).getManagementSupport().isReadOnly())) { - return; - } else { - Entity rootUnmanaged = entity; - while (true) { - Entity candidateUnmanagedParent = rootUnmanaged.getParent(); - if (candidateUnmanagedParent == null || isManaged(candidateUnmanagedParent) || isPreManaged(candidateUnmanagedParent)) - break; - rootUnmanaged = candidateUnmanagedParent; - } - if (context == Startable.START.getName()) - log.info("Activating local management for {} on start", rootUnmanaged); - else - log.warn("Activating local management for {} due to effector invocation on {}: {}", new Object[]{rootUnmanaged, entity, context}); - manage(rootUnmanaged); - } - } - - private void recursively(Entity e, Predicate<EntityInternal> action) { - Entity otherPreregistered = preRegisteredEntitiesById.get(e.getId()); - if (otherPreregistered!=null) { - // if something has been pre-registered, prefer it - // (e.g. if we recursing through children, we might have a proxy from previous iteration; - // the most recent will have been pre-registered) - e = otherPreregistered; - } - - boolean success = action.apply( (EntityInternal)e ); - if (!success) { - return; // Don't manage children if action false/unnecessary for parent - } - for (Entity child : e.getChildren()) { - recursively(child, action); - } - } - - /** - * Whether the entity is in the process of being managed. - */ - private synchronized boolean isPreManaged(Entity e) { - return preManagedEntitiesById.containsKey(e.getId()); - } - - /** - * Should ensure that the entity is now known about, but should not be accessible from other entities yet. - * - * Records that the given entity is about to be managed (used for answering {@link isPreManaged(Entity)}. - * Note that refs to the given entity are stored in a a weak hashmap so if the subsequent management - * attempt fails then this reference to the entity will eventually be discarded (if no-one else holds - * a reference). - */ - private synchronized boolean preManageNonRecursive(Entity e, ManagementTransitionMode mode) { - Entity realE = toRealEntity(e); - - Object old = preManagedEntitiesById.put(e.getId(), realE); - preRegisteredEntitiesById.remove(e.getId()); - - if (old!=null && mode.wasNotLoaded()) { - if (old.equals(e)) { - log.warn("{} redundant call to pre-start management of entity {}, mode {}; ignoring", new Object[] { this, e, mode }); - } else { - throw new IllegalStateException("call to pre-manage entity "+e+" ("+mode+") but different entity "+old+" already known under that id at "+this); - } - return false; - } else { - if (log.isTraceEnabled()) log.trace("{} pre-start management of entity {}, mode {}", - new Object[] { this, e, mode }); - return true; - } - } - - /** - * Should ensure that the entity is now managed somewhere, and known about in all the lists. - * Returns true if the entity has now become managed; false if it was already managed (anything else throws exception) - * @param isOrWasReadOnly - */ - private synchronized boolean manageNonRecursive(Entity e, ManagementTransitionMode mode) { - Entity old = entitiesById.get(e.getId()); - - if (old!=null && mode.wasNotLoaded()) { - if (old.equals(e)) { - log.warn("{} redundant call to start management of entity {}; ignoring", this, e); - } else { - throw new IllegalStateException("call to manage entity "+e+" ("+mode+") but different entity "+old+" already known under that id at "+this); - } - return false; - } - - BrooklynLogging.log(log, BrooklynLogging.levelDebugOrTraceIfReadOnly(e), - "{} starting management of entity {}", this, e); - Entity realE = toRealEntity(e); - - Entity oldProxy = entityProxiesById.get(e.getId()); - Entity proxyE; - if (oldProxy!=null) { - if (mode.wasNotLoaded()) { - throw new IllegalStateException("call to manage entity "+e+" from unloaded state ("+mode+") but already had proxy "+oldProxy+" already known under that id at "+this); - } - // make the old proxy point at this new delegate - // (some other tricks done in the call below) - ((EntityProxyImpl)(Proxy.getInvocationHandler(oldProxy))).resetDelegate(oldProxy, oldProxy, realE); - proxyE = oldProxy; - } else { - proxyE = toProxyEntityIfAvailable(e); - } - entityProxiesById.put(e.getId(), proxyE); - entityTypes.put(e.getId(), realE.getClass().getName()); - entitiesById.put(e.getId(), realE); - - preManagedEntitiesById.remove(e.getId()); - if ((e instanceof Application) && (e.getParent()==null)) { - applications.add((Application)proxyE); - applicationIds.add(e.getId()); - } - if (!entities.contains(proxyE)) - entities.add(proxyE); - - if (old!=null && old!=e) { - // passing the transition info will ensure the right shutdown steps invoked for old instance - unmanage(old, mode, true); - } - - return true; - } - - /** - * Should ensure that the entity is no longer managed anywhere, remove from all lists. - * Returns true if the entity has been removed from management; if it was not previously managed (anything else throws exception) - */ - private boolean unmanageNonRecursive(Entity e) { - /* - * When method is synchronized, hit deadlock: - * 1. thread called unmanage() on a member of a group, so we got the lock and called group.removeMember; - * this ties to synchronize on AbstractGroupImpl.members - * 2. another thread was doing AbstractGroupImpl.addMember, which is synchronized on AbstractGroupImpl.members; - * it tries to call Entities.manage(child) which calls LocalEntityManager.getEntity(), which is - * synchronized on this. - * - * We MUST NOT call alien code from within the management framework while holding locks. - * The AbstractGroup.removeMember is effectively alien because a user could override it, and because - * it is entity specific. - * - * TODO Does getting then removing from groups risk this entity being added to other groups while - * this is happening? Should abstractEntity.onManagementStopped or some such remove the entity - * from its groups? - */ - - if (!getLastManagementTransitionMode(e.getId()).isReadOnly()) { - e.clearParent(); - Collection<Group> groups = e.getGroups(); - for (Group group : groups) { - if (!Entities.isNoLongerManaged(group)) group.removeMember(e); - } - if (e instanceof Group) { - Collection<Entity> members = ((Group)e).getMembers(); - for (Entity member : members) { - if (!Entities.isNoLongerManaged(member)) member.removeGroup((Group)e); - } - } - } else { - log.debug("No relations being updated on unmanage of read only {}", e); - } - - synchronized (this) { - Entity proxyE = toProxyEntityIfAvailable(e); - if (e instanceof Application) { - applications.remove(proxyE); - applicationIds.remove(e.getId()); - } - - entities.remove(proxyE); - entityProxiesById.remove(e.getId()); - entityModesById.remove(e.getId()); - Object old = entitiesById.remove(e.getId()); - - entityTypes.remove(e.getId()); - if (old==null) { - log.warn("{} call to stop management of unknown entity (already unmanaged?) {}; ignoring", this, e); - return false; - } else if (!old.equals(e)) { - // shouldn't happen... - log.error("{} call to stop management of entity {} removed different entity {}", new Object[] { this, e, old }); - return true; - } else { - if (log.isDebugEnabled()) log.debug("{} stopped management of entity {}", this, e); - return true; - } - } - } - - void addEntitySetListener(CollectionChangeListener<Entity> listener) { - //must notify listener in a different thread to avoid deadlock (issue #378) - AsyncCollectionChangeAdapter<Entity> wrappedListener = new AsyncCollectionChangeAdapter<Entity>(managementContext.getExecutionManager(), listener); - entities.addPropertyChangeListener(new GroovyObservablesPropertyChangeToCollectionChangeAdapter(wrappedListener)); - } - - void removeEntitySetListener(CollectionChangeListener<Entity> listener) { - AsyncCollectionChangeAdapter<Entity> wrappedListener = new AsyncCollectionChangeAdapter<Entity>(managementContext.getExecutionManager(), listener); - entities.removePropertyChangeListener(new GroovyObservablesPropertyChangeToCollectionChangeAdapter(wrappedListener)); - } - - private boolean shouldSkipUnmanagement(Entity e) { - if (e==null) { - log.warn(""+this+" call to unmanage null entity; skipping", - new IllegalStateException("source of null unmanagement call to "+this)); - return true; - } - if (!isManaged(e)) { - log.warn("{} call to stop management of unknown entity (already unmanaged?) {}; skipping, and all descendants", this, e); - return true; - } - return false; - } - - private Entity toProxyEntityIfAvailable(Entity e) { - checkNotNull(e, "entity"); - - if (e instanceof EntityProxy) { - return e; - } else if (e instanceof AbstractEntity) { - Entity result = ((AbstractEntity)e).getProxy(); - return (result == null) ? e : result; - } else { - // If we don't already know about the proxy, then use the real thing; presumably it's - // the legacy way of creating the entity so didn't get a preManage() call - - return e; - } - } - - private Entity toRealEntity(Entity e) { - checkNotNull(e, "entity"); - - if (e instanceof AbstractEntity) { - return e; - } else { - Entity result = toRealEntityOrNull(e.getId()); - if (result == null) { - throw new IllegalStateException("No concrete entity known for entity "+e+" ("+e.getId()+", "+e.getEntityType().getName()+")"); - } - return result; - } - } - - public boolean isKnownEntityId(String id) { - return entitiesById.containsKey(id) || preManagedEntitiesById.containsKey(id) || preRegisteredEntitiesById.containsKey(id); - } - - private Entity toRealEntityOrNull(String id) { - Entity result; - // prefer the preRegistered and preManaged entities, during hot proxying, they should be newer - result = preRegisteredEntitiesById.get(id); - if (result==null) - result = preManagedEntitiesById.get(id); - if (result==null) - entitiesById.get(id); - return result; - } - - private boolean isRunning() { - return managementContext.isRunning(); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/LocalLocationManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/LocalLocationManager.java b/core/src/main/java/brooklyn/management/internal/LocalLocationManager.java deleted file mode 100644 index 31dc036..0000000 --- a/core/src/main/java/brooklyn/management/internal/LocalLocationManager.java +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Closeable; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.LocationSpec; -import org.apache.brooklyn.api.location.ProvisioningLocation; -import org.apache.brooklyn.api.management.AccessController; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.BrooklynLogging; -import brooklyn.config.BrooklynLogging.LoggingLevel; -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.Lifecycle; -import brooklyn.entity.proxying.InternalLocationFactory; -import brooklyn.internal.storage.BrooklynStorage; - -import org.apache.brooklyn.location.basic.AbstractLocation; -import org.apache.brooklyn.location.basic.LocationInternal; - -import brooklyn.management.entitlement.Entitlements; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.exceptions.RuntimeInterruptedException; -import brooklyn.util.stream.Streams; -import brooklyn.util.task.Tasks; - -import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; - -public class LocalLocationManager implements LocationManagerInternal { - - @Beta /* expect to remove when API returns LocationSpec or similar */ - public static final ConfigKey<Boolean> CREATE_UNMANAGED = ConfigKeys.newBooleanConfigKey("brooklyn.internal.location.createUnmanaged", - "If set on a location or spec, causes the manager to create it in an unmanaged state (for peeking)", false); - - private static final Logger log = LoggerFactory.getLogger(LocalLocationManager.class); - - private final LocalManagementContext managementContext; - private final InternalLocationFactory locationFactory; - - protected final Map<String,Location> locationsById = Maps.newLinkedHashMap(); - private final Map<String, Location> preRegisteredLocationsById = Maps.newLinkedHashMap(); - - /** Management mode for each location */ - protected final Map<String,ManagementTransitionMode> locationModesById = Maps.newLinkedHashMap(); - - private final BrooklynStorage storage; - private Map<String, String> locationTypes; - - private static AtomicLong LOCATION_CNT = new AtomicLong(0); - - public LocalLocationManager(LocalManagementContext managementContext) { - this.managementContext = checkNotNull(managementContext, "managementContext"); - this.locationFactory = new InternalLocationFactory(managementContext); - - this.storage = managementContext.getStorage(); - locationTypes = storage.getMap("locations"); - } - - public InternalLocationFactory getLocationFactory() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return locationFactory; - - } - - @Override - public <T extends Location> T createLocation(LocationSpec<T> spec) { - try { - boolean createUnmanaged = ConfigBag.coerceFirstNonNullKeyValue(CREATE_UNMANAGED, - spec.getConfig().get(CREATE_UNMANAGED), spec.getFlags().get(CREATE_UNMANAGED.getName())); - if (createUnmanaged) { - spec.removeConfig(CREATE_UNMANAGED); - } - - T loc = locationFactory.createLocation(spec); - if (!createUnmanaged) { - manage(loc); - } else { - // remove references - Location parent = loc.getParent(); - if (parent!=null) { - ((AbstractLocation)parent).removeChild(loc); - } - preRegisteredLocationsById.remove(loc.getId()); - } - - return loc; - } catch (Throwable e) { - log.warn("Failed to create location using spec "+spec+" (rethrowing)", e); - throw Exceptions.propagate(e); - } - } - - @Override - public <T extends Location> T createLocation(Map<?,?> config, Class<T> type) { - return createLocation(LocationSpec.create(config, type)); - } - - @Override - public synchronized Collection<Location> getLocations() { - return ImmutableList.copyOf(locationsById.values()); - } - - @Override - public Collection<String> getLocationIds() { - return ImmutableList.copyOf(locationsById.keySet()); - } - - @Override - public synchronized Location getLocation(String id) { - return locationsById.get(id); - } - - public synchronized Location getLocationEvenIfPreManaged(String id) { - Location result = locationsById.get(id); - if (result == null) { - result = preRegisteredLocationsById.get(id); - } - return result; - } - - @Override - public boolean isManaged(Location loc) { - return (isRunning() && loc != null && getLocation(loc.getId()) != null); - } - - synchronized boolean isPreRegistered(Location loc) { - return preRegisteredLocationsById.containsKey(loc.getId()); - } - - public boolean isKnownLocationId(String id) { - return preRegisteredLocationsById.containsKey(id) || locationsById.containsKey(id); - } - - synchronized void prePreManage(Location loc) { - if (isPreRegistered(loc)) { - log.warn(""+this+" redundant call to pre-pre-manage location "+loc+"; skipping", - new Exception("source of duplicate pre-pre-manage of "+loc)); - return; - } - preRegisteredLocationsById.put(loc.getId(), loc); - } - - @Override - public ManagementTransitionMode getLastManagementTransitionMode(String itemId) { - return locationModesById.get(itemId); - } - - @Override - public void setManagementTransitionMode(Location item, ManagementTransitionMode mode) { - locationModesById.put(item.getId(), mode); - } - - // TODO synchronization issues here: see comment in LocalEntityManager.manage(Entity) - /** management on creation */ - @Override - public Location manage(Location loc) { - if (isManaged(loc)) { - // TODO put log.warn back in if/when manage(Location) becomes private; or could even have assert. - // Can be stricter about contract. - return loc; - } - - Location parent = loc.getParent(); - if (parent != null && !managementContext.getLocationManager().isManaged(parent)) { - log.warn("Parent location "+parent+" of "+loc+" is not managed; attempting to manage it (in future this may be disallowed)"); - return manage(parent); - } else { - return manageRecursive(loc, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.NONEXISTENT, BrooklynObjectManagementMode.MANAGED_PRIMARY)); - } - } - - @Override - public void manageRebindedRoot(Location item) { - ManagementTransitionMode mode = getLastManagementTransitionMode(item.getId()); - Preconditions.checkNotNull(mode, "Mode not set for rebinding %s", item); - manageRecursive(item, mode); - } - - protected void checkManagementAllowed(Location item) { - AccessController.Response access = managementContext.getAccessController().canManageLocation(item); - if (!access.isAllowed()) { - throw new IllegalStateException("Access controller forbids management of "+item+": "+access.getMsg()); - } - } - - protected Location manageRecursive(Location loc, final ManagementTransitionMode initialMode) { - // TODO see comments in LocalEntityManager about recursive management / manageRebindRoot v manageAll - - AccessController.Response access = managementContext.getAccessController().canManageLocation(loc); - if (!access.isAllowed()) { - throw new IllegalStateException("Access controller forbids management of "+loc+": "+access.getMsg()); - } - - long count = LOCATION_CNT.incrementAndGet(); - if (log.isDebugEnabled()) { - String msg = "Managing location " + loc + " ("+initialMode+"), from " + Tasks.current()+" / "+Entitlements.getEntitlementContext(); - LoggingLevel level = (!initialMode.wasNotLoaded() || initialMode.isReadOnly() ? LoggingLevel.TRACE : LoggingLevel.DEBUG); - if (count % 100 == 0) { - // include trace periodically in case we get leaks or too much location management - BrooklynLogging.log(log, level, - msg, new Exception("Informational stack trace of call to manage location "+loc+" ("+count+" calls; "+getLocations().size()+" currently managed)")); - } else { - BrooklynLogging.log(log, level, msg); - } - } - - recursively(loc, new Predicate<AbstractLocation>() { public boolean apply(AbstractLocation it) { - ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); - if (mode==null) { - setManagementTransitionMode(it, mode = initialMode); - } - - if (it.isManaged()) { - if (mode.wasNotLoaded()) { - // silently bail out - return false; - } else { - // on rebind, we just replace, fall through to below - } - } - - boolean result = manageNonRecursive(it, mode); - if (result) { - it.setManagementContext(managementContext); - if (mode.isPrimary()) { - it.onManagementStarted(); - if (mode.isCreating()) { - // Never record event on rebind; this isn't the location (e.g. the VM) being "created" - // so don't tell listeners that. - // TODO The location-event history should be persisted; currently it is lost on - // rebind, unless there is a listener that is persisting the state externally itself. - recordLocationEvent(it, Lifecycle.CREATED); - } - } - managementContext.getRebindManager().getChangeListener().onManaged(it); - } - return result; - } }); - return loc; - } - - @Override - public void unmanage(final Location loc) { - unmanage(loc, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT)); - } - - public void unmanage(final Location loc, final ManagementTransitionMode mode) { - unmanage(loc, mode, false); - } - - private void unmanage(final Location loc, final ManagementTransitionMode mode, boolean hasBeenReplaced) { - if (shouldSkipUnmanagement(loc)) return; - - if (hasBeenReplaced) { - // we are unmanaging an old instance after having replaced it; - // don't unmanage or even clear its fields, because there might be references to it - - if (mode.wasReadOnly()) { - // if coming *from* read only; nothing needed - } else { - if (!mode.wasPrimary()) { - log.warn("Unexpected mode "+mode+" for unmanage-replace "+loc+" (applying anyway)"); - } - // migrating away or in-place active partial rebind: - managementContext.getRebindManager().getChangeListener().onUnmanaged(loc); - if (managementContext.gc != null) managementContext.gc.onUnmanaged(loc); - } - // do not remove from maps below, bail out now - return; - - } else if ((mode.wasPrimary() && mode.isReadOnly()) || (mode.wasReadOnly() && mode.isNoLongerLoaded())) { - if (mode.isReadOnly() && mode.wasPrimary()) { - // TODO shouldn't this fall into "hasBeenReplaced" above? - log.debug("Unmanaging on demotion: "+loc+" ("+mode+")"); - } - // we are unmanaging an instance whose primary management is elsewhere (either we were secondary, or we are being demoted) - unmanageNonRecursiveRemoveFromRecords(loc, mode); - managementContext.getRebindManager().getChangeListener().onUnmanaged(loc); - if (managementContext.gc != null) managementContext.gc.onUnmanaged(loc); - unmanageNonRecursiveClearItsFields(loc, mode); - - } else if (mode.isNoLongerLoaded()) { - // Need to store all child entities as onManagementStopping removes a child from the parent entity - - // As above, see TODO in LocalEntityManager about recursive management / unmanagement v manageAll/unmanageAll - recursively(loc, new Predicate<AbstractLocation>() { public boolean apply(AbstractLocation it) { - if (shouldSkipUnmanagement(it)) return false; - boolean result = unmanageNonRecursiveRemoveFromRecords(it, mode); - if (result) { - ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); - if (mode==null) { - // ad hoc creation e.g. tests - log.debug("Missing transition mode for "+it+" when unmanaging; assuming primary/destroying"); - mode = ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT); - } - if (mode.wasPrimary()) it.onManagementStopped(); - managementContext.getRebindManager().getChangeListener().onUnmanaged(it); - if (mode.isDestroying()) recordLocationEvent(it, Lifecycle.DESTROYED); - if (managementContext.gc != null) managementContext.gc.onUnmanaged(it); - } - unmanageNonRecursiveClearItsFields(loc, mode); - return result; - } }); - - } else { - log.warn("Invalid mode for unmanage: "+mode+" on "+loc+" (ignoring)"); - } - - if (loc instanceof Closeable) { - Streams.closeQuietly( (Closeable)loc ); - } - - locationsById.remove(loc.getId()); - preRegisteredLocationsById.remove(loc.getId()); - locationModesById.remove(loc.getId()); - locationTypes.remove(loc.getId()); - } - - /** - * Adds this location event to the usage record for the given location (creating the usage - * record if one does not already exist). - */ - private void recordLocationEvent(LocationInternal loc, Lifecycle state) { - try { - managementContext.getUsageManager().recordLocationEvent(loc, state); - } catch (RuntimeInterruptedException e) { - throw e; - } catch (RuntimeException e) { - log.warn("Failed to store location lifecycle event for "+loc+" (ignoring)", e); - } - } - - private void recursively(Location e, Predicate<AbstractLocation> action) { - boolean success = action.apply( (AbstractLocation)e ); - if (!success) { - return; // Don't manage children if action false/unnecessary for parent - } - for (Location child : e.getChildren()) { - recursively(child, action); - } - } - - /** - * Should ensure that the location is now managed somewhere, and known about in all the lists. - * Returns true if the location has now become managed; false if it was already managed (anything else throws exception) - * @param rebindPrimary true if rebinding primary, false if rebinding as copy, null if creating (not rebinding) - */ - private synchronized boolean manageNonRecursive(Location loc, ManagementTransitionMode mode) { - Location old = locationsById.put(loc.getId(), loc); - preRegisteredLocationsById.remove(loc.getId()); - - locationTypes.put(loc.getId(), loc.getClass().getName()); - - if (old!=null && mode.wasNotLoaded()) { - if (old.equals(loc)) { - log.warn("{} redundant call to start management of location {}", this, loc); - } else { - throw new IllegalStateException("call to manage location "+loc+" but different location "+old+" already known under that id at "+this); - } - return false; - } - - if (old!=null && old!=loc) { - // passing the transition info will ensure the right shutdown steps invoked for old instance - unmanage(old, mode, true); - } - - return true; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private synchronized void unmanageNonRecursiveClearItsFields(Location loc, ManagementTransitionMode mode) { - if (mode.isDestroying()) { - ((AbstractLocation)loc).setParent(null, true); - - Location parent = ((AbstractLocation)loc).getParent(); - if (parent instanceof ProvisioningLocation<?>) { - try { - ((ProvisioningLocation)parent).release(loc); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.debug("Error releasing "+loc+" in its parent "+parent+": "+e); - } - } - } else { - // if not destroying, don't change the parent's children list - ((AbstractLocation)loc).setParent(null, false); - } - // clear config to help with GC; i know you're not supposed to, but this seems to help, else config bag is littered with refs to entities etc - // FIXME relies on config().getLocalBag() returning the underlying bag! - ((AbstractLocation)loc).config().getLocalBag().clear(); - } - - /** - * Should ensure that the location is no longer managed anywhere, remove from all lists. - * Returns true if the location has been removed from management; if it was not previously managed (anything else throws exception) - */ - private synchronized boolean unmanageNonRecursiveRemoveFromRecords(Location loc, ManagementTransitionMode mode) { - Object old = locationsById.remove(loc.getId()); - locationTypes.remove(loc.getId()); - locationModesById.remove(loc.getId()); - - if (old==null) { - log.warn("{} call to stop management of unknown location (already unmanaged?) {}; ignoring", this, loc); - return false; - } else if (!old.equals(loc)) { - // shouldn't happen... - log.error("{} call to stop management of location {} removed different location {}; ignoring", new Object[] { this, loc, old }); - return true; - } else { - if (log.isDebugEnabled()) log.debug("{} stopped management of location {}", this, loc); - return true; - } - } - - private boolean shouldSkipUnmanagement(Location loc) { - if (loc==null) { - log.warn(""+this+" call to unmanage null location; skipping", - new IllegalStateException("source of null unmanagement call to "+this)); - return true; - } - if (!isManaged(loc)) { - log.warn("{} call to stop management of unknown location (already unmanaged?) {}; skipping, and all descendants", this, loc); - return true; - } - return false; - } - - private boolean isRunning() { - return managementContext.isRunning(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java b/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java deleted file mode 100644 index db9b33b..0000000 --- a/core/src/main/java/brooklyn/management/internal/LocalManagementContext.java +++ /dev/null @@ -1,421 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.management.internal; - -import static brooklyn.util.JavaGroovyEquivalents.elvis; -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.brooklyn.api.entity.Application; -import org.apache.brooklyn.api.entity.Effector; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.management.AccessController; -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.ExecutionManager; -import org.apache.brooklyn.api.management.ManagementContext; -import org.apache.brooklyn.api.management.SubscriptionManager; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.api.management.TaskAdaptable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.BrooklynProperties; -import brooklyn.config.BrooklynProperties.Factory.Builder; -import brooklyn.entity.drivers.downloads.BasicDownloadsManager; -import brooklyn.entity.effector.Effectors; -import brooklyn.entity.proxying.InternalEntityFactory; -import brooklyn.entity.proxying.InternalLocationFactory; -import brooklyn.entity.proxying.InternalPolicyFactory; -import brooklyn.internal.BrooklynFeatureEnablement; -import brooklyn.internal.storage.DataGridFactory; -import brooklyn.management.entitlement.Entitlements; -import brooklyn.management.ha.OsgiManager; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.guava.Maybe; -import brooklyn.util.task.BasicExecutionContext; -import brooklyn.util.task.BasicExecutionManager; -import brooklyn.util.task.DynamicTasks; -import brooklyn.util.task.TaskTags; -import brooklyn.util.task.Tasks; -import brooklyn.util.text.Strings; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableSet; - -/** - * A local (single node) implementation of the {@link ManagementContext} API. - */ -public class LocalManagementContext extends AbstractManagementContext { - - private static final Logger log = LoggerFactory.getLogger(LocalManagementContext.class); - - private static final Set<LocalManagementContext> INSTANCES = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<LocalManagementContext, Boolean>())); - - private final Builder builder; - - private final List<ManagementContext.PropertiesReloadListener> reloadListeners = new CopyOnWriteArrayList<ManagementContext.PropertiesReloadListener>(); - - @VisibleForTesting - static Set<LocalManagementContext> getInstances() { - synchronized (INSTANCES) { - return ImmutableSet.copyOf(INSTANCES); - } - } - - // Note also called reflectively by BrooklynLeakListener - public static void logAll(Logger logger){ - for (LocalManagementContext context : getInstances()) { - logger.warn("Management Context "+context+" running, creation stacktrace:\n" + Throwables.getStackTraceAsString(context.constructionStackTrace)); - } - } - - /** terminates all (best effort); returns count of sessions closed; if exceptions thrown, returns negative number. - * semantics might change, particular in dealing with interminable mgmt contexts. */ - // Note also called reflectively by BrooklynLeakListener - @Beta - public static int terminateAll() { - int closed=0,dangling=0; - for (LocalManagementContext context : getInstances()) { - try { - context.terminate(); - closed++; - }catch (Throwable t) { - Exceptions.propagateIfFatal(t); - log.warn("Failed to terminate management context", t); - dangling++; - } - } - if (dangling>0) return -dangling; - return closed; - } - - private String managementPlaneId; - private String managementNodeId; - private BasicExecutionManager execution; - private SubscriptionManager subscriptions; - private LocalEntityManager entityManager; - private final LocalLocationManager locationManager; - private final LocalAccessManager accessManager; - private final LocalUsageManager usageManager; - private OsgiManager osgiManager; - - public final Throwable constructionStackTrace = new Throwable("for construction stacktrace").fillInStackTrace(); - - private final Map<String, Object> brooklynAdditionalProperties; - - /** - * Creates a LocalManagement with default BrooklynProperties. - */ - public LocalManagementContext() { - this(BrooklynProperties.Factory.builderDefault()); - } - - public LocalManagementContext(BrooklynProperties brooklynProperties) { - this(brooklynProperties, (DataGridFactory)null); - } - - /** - * Creates a new LocalManagementContext. - * - * @param brooklynProperties the BrooklynProperties. - * @param datagridFactory the DataGridFactory to use. If this instance is null, it means that the system - * is going to use BrooklynProperties to figure out which instance to load or otherwise - * use a default instance. - */ - @VisibleForTesting - public LocalManagementContext(BrooklynProperties brooklynProperties, DataGridFactory datagridFactory) { - this(Builder.fromProperties(brooklynProperties), datagridFactory); - } - - public LocalManagementContext(Builder builder) { - this(builder, null, null); - } - - public LocalManagementContext(Builder builder, DataGridFactory datagridFactory) { - this(builder, null, datagridFactory); - } - - public LocalManagementContext(Builder builder, Map<String, Object> brooklynAdditionalProperties) { - this(builder, brooklynAdditionalProperties, null); - } - - public LocalManagementContext(BrooklynProperties brooklynProperties, Map<String, Object> brooklynAdditionalProperties) { - this(Builder.fromProperties(brooklynProperties), brooklynAdditionalProperties, null); - } - - public LocalManagementContext(Builder builder, Map<String, Object> brooklynAdditionalProperties, DataGridFactory datagridFactory) { - super(builder.build(), datagridFactory); - - checkNotNull(configMap, "brooklynProperties"); - - // TODO in a persisted world the planeId may be injected - this.managementPlaneId = Strings.makeRandomId(8); - this.managementNodeId = Strings.makeRandomId(8); - this.builder = builder; - this.brooklynAdditionalProperties = brooklynAdditionalProperties; - if (brooklynAdditionalProperties != null) - configMap.addFromMap(brooklynAdditionalProperties); - - BrooklynFeatureEnablement.init(configMap); - - this.locationManager = new LocalLocationManager(this); - this.accessManager = new LocalAccessManager(); - this.usageManager = new LocalUsageManager(this); - - if (configMap.getConfig(OsgiManager.USE_OSGI)) { - this.osgiManager = new OsgiManager(this); - osgiManager.start(); - } - - INSTANCES.add(this); - log.debug("Created management context "+this); - } - - @Override - public String getManagementPlaneId() { - return managementPlaneId; - } - - @Override - public String getManagementNodeId() { - return managementNodeId; - } - - @Override - public void prePreManage(Entity entity) { - getEntityManager().prePreManage(entity); - } - - @Override - public void prePreManage(Location location) { - getLocationManager().prePreManage(location); - } - - @Override - public synchronized Collection<Application> getApplications() { - return getEntityManager().getApplications(); - } - - @Override - public void addEntitySetListener(CollectionChangeListener<Entity> listener) { - getEntityManager().addEntitySetListener(listener); - } - - @Override - public void removeEntitySetListener(CollectionChangeListener<Entity> listener) { - getEntityManager().removeEntitySetListener(listener); - } - - @Override - protected void manageIfNecessary(Entity entity, Object context) { - getEntityManager().manageIfNecessary(entity, context); - } - - @Override - public synchronized LocalEntityManager getEntityManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - - if (entityManager == null) { - entityManager = new LocalEntityManager(this); - } - return entityManager; - } - - @Override - public InternalEntityFactory getEntityFactory() { - return getEntityManager().getEntityFactory(); - } - - @Override - public InternalLocationFactory getLocationFactory() { - return getLocationManager().getLocationFactory(); - } - - @Override - public InternalPolicyFactory getPolicyFactory() { - return getEntityManager().getPolicyFactory(); - } - - @Override - public synchronized LocalLocationManager getLocationManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return locationManager; - } - - @Override - public synchronized LocalAccessManager getAccessManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return accessManager; - } - - @Override - public synchronized LocalUsageManager getUsageManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return usageManager; - } - - @Override - public synchronized Maybe<OsgiManager> getOsgiManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - if (osgiManager==null) return Maybe.absent("OSGi not available in this instance"); - return Maybe.of(osgiManager); - } - - @Override - public synchronized AccessController getAccessController() { - return getAccessManager().getAccessController(); - } - - @Override - public synchronized SubscriptionManager getSubscriptionManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - - if (subscriptions == null) { - subscriptions = new LocalSubscriptionManager(getExecutionManager()); - } - return subscriptions; - } - - @Override - public synchronized ExecutionManager getExecutionManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - - if (execution == null) { - execution = new BasicExecutionManager(getManagementNodeId()); - gc = new BrooklynGarbageCollector(configMap, execution, getStorage()); - } - return execution; - } - - @Override - public void terminate() { - INSTANCES.remove(this); - super.terminate(); - if (osgiManager!=null) { - osgiManager.stop(); - osgiManager = null; - } - if (usageManager != null) usageManager.terminate(); - if (execution != null) execution.shutdownNow(); - if (gc != null) gc.shutdownNow(); - } - - @Override - protected void finalize() { - terminate(); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public <T> Task<T> runAtEntity(Map flags, Entity entity, Callable<T> c) { - manageIfNecessary(entity, elvis(Arrays.asList(flags.get("displayName"), flags.get("description"), flags, c))); - return runAtEntity(entity, Tasks.<T>builder().dynamic(true).body(c).flags(flags).build()); - } - - protected <T> Task<T> runAtEntity(Entity entity, TaskAdaptable<T> task) { - getExecutionContext(entity).submit(task); - if (DynamicTasks.getTaskQueuingContext()!=null) { - // put it in the queueing context so it appears in the GUI - // mark it inessential as this is being invoked from code, - // the caller will do 'get' to handle errors - TaskTags.markInessential(task); - DynamicTasks.getTaskQueuingContext().queue(task.asTask()); - } - return task.asTask(); - } - - @Override - protected <T> Task<T> runAtEntity(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters) { - manageIfNecessary(entity, eff); - // prefer to submit this from the current execution context so it sets up correct cross-context chaining - ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); - if (ec == null) { - log.debug("Top-level effector invocation: {} on {}", eff, entity); - ec = getExecutionContext(entity); - } - return runAtEntity(entity, Effectors.invocation(entity, eff, parameters)); - } - - @Override - public boolean isManagedLocally(Entity e) { - return true; - } - - @Override - public String toString() { - return LocalManagementContext.class.getSimpleName()+"["+getManagementPlaneId()+"-"+getManagementNodeId()+"]"; - } - - @Override - public void reloadBrooklynProperties() { - log.info("Reloading brooklyn properties from " + builder); - if (builder.hasDelegateOriginalProperties()) - log.warn("When reloading, mgmt context "+this+" properties are fixed, so reload will be of limited utility"); - - BrooklynProperties properties = builder.build(); - configMap = properties; - if (brooklynAdditionalProperties != null) { - log.info("Reloading additional brooklyn properties from " + brooklynAdditionalProperties); - configMap.addFromMap(brooklynAdditionalProperties); - } - this.downloadsManager = BasicDownloadsManager.newDefault(configMap); - this.entitlementManager = Entitlements.newManager(this, configMap); - - clearLocationRegistry(); - - BrooklynFeatureEnablement.init(configMap); - - // Notify listeners that properties have been reloaded - for (PropertiesReloadListener listener : reloadListeners) { - listener.reloaded(); - } - } - - @VisibleForTesting - public void clearLocationRegistry() { - // Force reload of location registry - this.locationRegistry = null; - } - - @Override - public void addPropertiesReloadListener(PropertiesReloadListener listener) { - reloadListeners.add(checkNotNull(listener, "listener")); - } - - @Override - public void removePropertiesReloadListener(PropertiesReloadListener listener) { - reloadListeners.remove(listener); - } - - public void noteStartupComplete() { - startupComplete = true; - } -}
