http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalLocationManager.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalLocationManager.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalLocationManager.java deleted file mode 100644 index dcd1b43..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalLocationManager.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Closeable; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.LocationSpec; -import org.apache.brooklyn.api.location.ProvisioningLocation; -import org.apache.brooklyn.api.mgmt.AccessController; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.BrooklynLogging; -import org.apache.brooklyn.core.BrooklynLogging.LoggingLevel; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; -import org.apache.brooklyn.core.internal.storage.BrooklynStorage; -import org.apache.brooklyn.core.location.AbstractLocation; -import org.apache.brooklyn.core.location.internal.LocationInternal; -import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; -import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; -import org.apache.brooklyn.util.stream.Streams; - -import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; - -public class LocalLocationManager implements LocationManagerInternal { - - @Beta /* expect to remove when API returns LocationSpec or similar */ - public static final ConfigKey<Boolean> CREATE_UNMANAGED = ConfigKeys.newBooleanConfigKey("brooklyn.internal.location.createUnmanaged", - "If set on a location or spec, causes the manager to create it in an unmanaged state (for peeking)", false); - - private static final Logger log = LoggerFactory.getLogger(LocalLocationManager.class); - - private final LocalManagementContext managementContext; - private final InternalLocationFactory locationFactory; - - protected final Map<String,Location> locationsById = Maps.newLinkedHashMap(); - private final Map<String, Location> preRegisteredLocationsById = Maps.newLinkedHashMap(); - - /** Management mode for each location */ - protected final Map<String,ManagementTransitionMode> locationModesById = Maps.newLinkedHashMap(); - - private final BrooklynStorage storage; - private Map<String, String> locationTypes; - - private static AtomicLong LOCATION_CNT = new AtomicLong(0); - - public LocalLocationManager(LocalManagementContext managementContext) { - this.managementContext = checkNotNull(managementContext, "managementContext"); - this.locationFactory = new InternalLocationFactory(managementContext); - - this.storage = managementContext.getStorage(); - locationTypes = storage.getMap("locations"); - } - - public InternalLocationFactory getLocationFactory() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return locationFactory; - - } - - @Override - public <T extends Location> T createLocation(LocationSpec<T> spec) { - try { - boolean createUnmanaged = ConfigBag.coerceFirstNonNullKeyValue(CREATE_UNMANAGED, - spec.getConfig().get(CREATE_UNMANAGED), spec.getFlags().get(CREATE_UNMANAGED.getName())); - if (createUnmanaged) { - spec.removeConfig(CREATE_UNMANAGED); - } - - T loc = locationFactory.createLocation(spec); - if (!createUnmanaged) { - manage(loc); - } else { - // remove references - Location parent = loc.getParent(); - if (parent!=null) { - ((AbstractLocation)parent).removeChild(loc); - } - preRegisteredLocationsById.remove(loc.getId()); - } - - return loc; - } catch (Throwable e) { - log.warn("Failed to create location using spec "+spec+" (rethrowing)", e); - throw Exceptions.propagate(e); - } - } - - @Override - public <T extends Location> T createLocation(Map<?,?> config, Class<T> type) { - return createLocation(LocationSpec.create(config, type)); - } - - @Override - public synchronized Collection<Location> getLocations() { - return ImmutableList.copyOf(locationsById.values()); - } - - @Override - public Collection<String> getLocationIds() { - return ImmutableList.copyOf(locationsById.keySet()); - } - - @Override - public synchronized Location getLocation(String id) { - return locationsById.get(id); - } - - public synchronized Location getLocationEvenIfPreManaged(String id) { - Location result = locationsById.get(id); - if (result == null) { - result = preRegisteredLocationsById.get(id); - } - return result; - } - - @Override - public boolean isManaged(Location loc) { - return (isRunning() && loc != null && getLocation(loc.getId()) != null); - } - - synchronized boolean isPreRegistered(Location loc) { - return preRegisteredLocationsById.containsKey(loc.getId()); - } - - public boolean isKnownLocationId(String id) { - return preRegisteredLocationsById.containsKey(id) || locationsById.containsKey(id); - } - - synchronized void prePreManage(Location loc) { - if (isPreRegistered(loc)) { - log.warn(""+this+" redundant call to pre-pre-manage location "+loc+"; skipping", - new Exception("source of duplicate pre-pre-manage of "+loc)); - return; - } - preRegisteredLocationsById.put(loc.getId(), loc); - } - - @Override - public ManagementTransitionMode getLastManagementTransitionMode(String itemId) { - return locationModesById.get(itemId); - } - - @Override - public void setManagementTransitionMode(Location item, ManagementTransitionMode mode) { - locationModesById.put(item.getId(), mode); - } - - // TODO synchronization issues here: see comment in LocalEntityManager.manage(Entity) - /** management on creation */ - @Override - public Location manage(Location loc) { - if (isManaged(loc)) { - // TODO put log.warn back in if/when manage(Location) becomes private; or could even have assert. - // Can be stricter about contract. - return loc; - } - - Location parent = loc.getParent(); - if (parent != null && !managementContext.getLocationManager().isManaged(parent)) { - log.warn("Parent location "+parent+" of "+loc+" is not managed; attempting to manage it (in future this may be disallowed)"); - return manage(parent); - } else { - return manageRecursive(loc, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.NONEXISTENT, BrooklynObjectManagementMode.MANAGED_PRIMARY)); - } - } - - @Override - public void manageRebindedRoot(Location item) { - ManagementTransitionMode mode = getLastManagementTransitionMode(item.getId()); - Preconditions.checkNotNull(mode, "Mode not set for rebinding %s", item); - manageRecursive(item, mode); - } - - protected void checkManagementAllowed(Location item) { - AccessController.Response access = managementContext.getAccessController().canManageLocation(item); - if (!access.isAllowed()) { - throw new IllegalStateException("Access controller forbids management of "+item+": "+access.getMsg()); - } - } - - protected Location manageRecursive(Location loc, final ManagementTransitionMode initialMode) { - // TODO see comments in LocalEntityManager about recursive management / manageRebindRoot v manageAll - - AccessController.Response access = managementContext.getAccessController().canManageLocation(loc); - if (!access.isAllowed()) { - throw new IllegalStateException("Access controller forbids management of "+loc+": "+access.getMsg()); - } - - long count = LOCATION_CNT.incrementAndGet(); - if (log.isDebugEnabled()) { - String msg = "Managing location " + loc + " ("+initialMode+"), from " + Tasks.current()+" / "+Entitlements.getEntitlementContext(); - LoggingLevel level = (!initialMode.wasNotLoaded() || initialMode.isReadOnly() ? LoggingLevel.TRACE : LoggingLevel.DEBUG); - if (count % 100 == 0) { - // include trace periodically in case we get leaks or too much location management - BrooklynLogging.log(log, level, - msg, new Exception("Informational stack trace of call to manage location "+loc+" ("+count+" calls; "+getLocations().size()+" currently managed)")); - } else { - BrooklynLogging.log(log, level, msg); - } - } - - recursively(loc, new Predicate<AbstractLocation>() { public boolean apply(AbstractLocation it) { - ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); - if (mode==null) { - setManagementTransitionMode(it, mode = initialMode); - } - - if (it.isManaged()) { - if (mode.wasNotLoaded()) { - // silently bail out - return false; - } else { - // on rebind, we just replace, fall through to below - } - } - - boolean result = manageNonRecursive(it, mode); - if (result) { - it.setManagementContext(managementContext); - if (mode.isPrimary()) { - it.onManagementStarted(); - if (mode.isCreating()) { - // Never record event on rebind; this isn't the location (e.g. the VM) being "created" - // so don't tell listeners that. - // TODO The location-event history should be persisted; currently it is lost on - // rebind, unless there is a listener that is persisting the state externally itself. - recordLocationEvent(it, Lifecycle.CREATED); - } - } - managementContext.getRebindManager().getChangeListener().onManaged(it); - } - return result; - } }); - return loc; - } - - @Override - public void unmanage(final Location loc) { - unmanage(loc, ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT)); - } - - public void unmanage(final Location loc, final ManagementTransitionMode mode) { - unmanage(loc, mode, false); - } - - private void unmanage(final Location loc, final ManagementTransitionMode mode, boolean hasBeenReplaced) { - if (shouldSkipUnmanagement(loc)) return; - - if (hasBeenReplaced) { - // we are unmanaging an old instance after having replaced it; - // don't unmanage or even clear its fields, because there might be references to it - - if (mode.wasReadOnly()) { - // if coming *from* read only; nothing needed - } else { - if (!mode.wasPrimary()) { - log.warn("Unexpected mode "+mode+" for unmanage-replace "+loc+" (applying anyway)"); - } - // migrating away or in-place active partial rebind: - managementContext.getRebindManager().getChangeListener().onUnmanaged(loc); - if (managementContext.gc != null) managementContext.gc.onUnmanaged(loc); - } - // do not remove from maps below, bail out now - return; - - } else if ((mode.wasPrimary() && mode.isReadOnly()) || (mode.wasReadOnly() && mode.isNoLongerLoaded())) { - if (mode.isReadOnly() && mode.wasPrimary()) { - // TODO shouldn't this fall into "hasBeenReplaced" above? - log.debug("Unmanaging on demotion: "+loc+" ("+mode+")"); - } - // we are unmanaging an instance whose primary management is elsewhere (either we were secondary, or we are being demoted) - unmanageNonRecursiveRemoveFromRecords(loc, mode); - managementContext.getRebindManager().getChangeListener().onUnmanaged(loc); - if (managementContext.gc != null) managementContext.gc.onUnmanaged(loc); - unmanageNonRecursiveClearItsFields(loc, mode); - - } else if (mode.isNoLongerLoaded()) { - // Need to store all child entities as onManagementStopping removes a child from the parent entity - - // As above, see TODO in LocalEntityManager about recursive management / unmanagement v manageAll/unmanageAll - recursively(loc, new Predicate<AbstractLocation>() { public boolean apply(AbstractLocation it) { - if (shouldSkipUnmanagement(it)) return false; - boolean result = unmanageNonRecursiveRemoveFromRecords(it, mode); - if (result) { - ManagementTransitionMode mode = getLastManagementTransitionMode(it.getId()); - if (mode==null) { - // ad hoc creation e.g. tests - log.debug("Missing transition mode for "+it+" when unmanaging; assuming primary/destroying"); - mode = ManagementTransitionMode.guessing(BrooklynObjectManagementMode.MANAGED_PRIMARY, BrooklynObjectManagementMode.NONEXISTENT); - } - if (mode.wasPrimary()) it.onManagementStopped(); - managementContext.getRebindManager().getChangeListener().onUnmanaged(it); - if (mode.isDestroying()) recordLocationEvent(it, Lifecycle.DESTROYED); - if (managementContext.gc != null) managementContext.gc.onUnmanaged(it); - } - unmanageNonRecursiveClearItsFields(loc, mode); - return result; - } }); - - } else { - log.warn("Invalid mode for unmanage: "+mode+" on "+loc+" (ignoring)"); - } - - if (loc instanceof Closeable) { - Streams.closeQuietly( (Closeable)loc ); - } - - locationsById.remove(loc.getId()); - preRegisteredLocationsById.remove(loc.getId()); - locationModesById.remove(loc.getId()); - locationTypes.remove(loc.getId()); - } - - /** - * Adds this location event to the usage record for the given location (creating the usage - * record if one does not already exist). - */ - private void recordLocationEvent(LocationInternal loc, Lifecycle state) { - try { - managementContext.getUsageManager().recordLocationEvent(loc, state); - } catch (RuntimeInterruptedException e) { - throw e; - } catch (RuntimeException e) { - log.warn("Failed to store location lifecycle event for "+loc+" (ignoring)", e); - } - } - - private void recursively(Location e, Predicate<AbstractLocation> action) { - boolean success = action.apply( (AbstractLocation)e ); - if (!success) { - return; // Don't manage children if action false/unnecessary for parent - } - for (Location child : e.getChildren()) { - recursively(child, action); - } - } - - /** - * Should ensure that the location is now managed somewhere, and known about in all the lists. - * Returns true if the location has now become managed; false if it was already managed (anything else throws exception) - * @param rebindPrimary true if rebinding primary, false if rebinding as copy, null if creating (not rebinding) - */ - private synchronized boolean manageNonRecursive(Location loc, ManagementTransitionMode mode) { - Location old = locationsById.put(loc.getId(), loc); - preRegisteredLocationsById.remove(loc.getId()); - - locationTypes.put(loc.getId(), loc.getClass().getName()); - - if (old!=null && mode.wasNotLoaded()) { - if (old.equals(loc)) { - log.warn("{} redundant call to start management of location {}", this, loc); - } else { - throw new IllegalStateException("call to manage location "+loc+" but different location "+old+" already known under that id at "+this); - } - return false; - } - - if (old!=null && old!=loc) { - // passing the transition info will ensure the right shutdown steps invoked for old instance - unmanage(old, mode, true); - } - - return true; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private synchronized void unmanageNonRecursiveClearItsFields(Location loc, ManagementTransitionMode mode) { - if (mode.isDestroying()) { - ((AbstractLocation)loc).setParent(null, true); - - Location parent = ((AbstractLocation)loc).getParent(); - if (parent instanceof ProvisioningLocation<?>) { - try { - ((ProvisioningLocation)parent).release(loc); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.debug("Error releasing "+loc+" in its parent "+parent+": "+e); - } - } - } else { - // if not destroying, don't change the parent's children list - ((AbstractLocation)loc).setParent(null, false); - } - // clear config to help with GC; i know you're not supposed to, but this seems to help, else config bag is littered with refs to entities etc - // FIXME relies on config().getLocalBag() returning the underlying bag! - ((AbstractLocation)loc).config().getLocalBag().clear(); - } - - /** - * Should ensure that the location is no longer managed anywhere, remove from all lists. - * Returns true if the location has been removed from management; if it was not previously managed (anything else throws exception) - */ - private synchronized boolean unmanageNonRecursiveRemoveFromRecords(Location loc, ManagementTransitionMode mode) { - Object old = locationsById.remove(loc.getId()); - locationTypes.remove(loc.getId()); - locationModesById.remove(loc.getId()); - - if (old==null) { - log.warn("{} call to stop management of unknown location (already unmanaged?) {}; ignoring", this, loc); - return false; - } else if (!old.equals(loc)) { - // shouldn't happen... - log.error("{} call to stop management of location {} removed different location {}; ignoring", new Object[] { this, loc, old }); - return true; - } else { - if (log.isDebugEnabled()) log.debug("{} stopped management of location {}", this, loc); - return true; - } - } - - private boolean shouldSkipUnmanagement(Location loc) { - if (loc==null) { - log.warn(""+this+" call to unmanage null location; skipping", - new IllegalStateException("source of null unmanagement call to "+this)); - return true; - } - if (!isManaged(loc)) { - log.warn("{} call to stop management of unknown location (already unmanaged?) {}; skipping, and all descendants", this, loc); - return true; - } - return false; - } - - private boolean isRunning() { - return managementContext.isRunning(); - } - -}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java deleted file mode 100644 index 76500bc..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalManagementContext.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.brooklyn.api.effector.Effector; -import org.apache.brooklyn.api.entity.Application; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.mgmt.AccessController; -import org.apache.brooklyn.api.mgmt.ExecutionContext; -import org.apache.brooklyn.api.mgmt.ExecutionManager; -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.api.mgmt.SubscriptionManager; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.api.mgmt.TaskAdaptable; -import org.apache.brooklyn.core.BrooklynFeatureEnablement; -import org.apache.brooklyn.core.effector.Effectors; -import org.apache.brooklyn.core.entity.drivers.downloads.BasicDownloadsManager; -import org.apache.brooklyn.core.internal.BrooklynProperties; -import org.apache.brooklyn.core.internal.BrooklynProperties.Factory.Builder; -import org.apache.brooklyn.core.internal.storage.DataGridFactory; -import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; -import org.apache.brooklyn.core.mgmt.ha.OsgiManager; -import org.apache.brooklyn.core.objs.proxy.InternalEntityFactory; -import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory; -import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory; -import org.apache.brooklyn.util.core.task.BasicExecutionContext; -import org.apache.brooklyn.util.core.task.BasicExecutionManager; -import org.apache.brooklyn.util.core.task.DynamicTasks; -import org.apache.brooklyn.util.core.task.TaskTags; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.guava.Maybe; -import org.apache.brooklyn.util.text.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableSet; - -/** - * A local (single node) implementation of the {@link ManagementContext} API. - */ -public class LocalManagementContext extends AbstractManagementContext { - - private static final Logger log = LoggerFactory.getLogger(LocalManagementContext.class); - - private static final Set<LocalManagementContext> INSTANCES = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<LocalManagementContext, Boolean>())); - - private final Builder builder; - - private final List<ManagementContext.PropertiesReloadListener> reloadListeners = new CopyOnWriteArrayList<ManagementContext.PropertiesReloadListener>(); - - @VisibleForTesting - static Set<LocalManagementContext> getInstances() { - synchronized (INSTANCES) { - return ImmutableSet.copyOf(INSTANCES); - } - } - - // Note also called reflectively by BrooklynLeakListener - public static void logAll(Logger logger){ - for (LocalManagementContext context : getInstances()) { - logger.warn("Management Context "+context+" running, creation stacktrace:\n" + Throwables.getStackTraceAsString(context.constructionStackTrace)); - } - } - - /** terminates all (best effort); returns count of sessions closed; if exceptions thrown, returns negative number. - * semantics might change, particular in dealing with interminable mgmt contexts. */ - // Note also called reflectively by BrooklynLeakListener - @Beta - public static int terminateAll() { - int closed=0,dangling=0; - for (LocalManagementContext context : getInstances()) { - try { - context.terminate(); - closed++; - }catch (Throwable t) { - Exceptions.propagateIfFatal(t); - log.warn("Failed to terminate management context", t); - dangling++; - } - } - if (dangling>0) return -dangling; - return closed; - } - - private AtomicBoolean terminated = new AtomicBoolean(false); - private String managementPlaneId; - private String managementNodeId; - private BasicExecutionManager execution; - private SubscriptionManager subscriptions; - private LocalEntityManager entityManager; - private final LocalLocationManager locationManager; - private final LocalAccessManager accessManager; - private final LocalUsageManager usageManager; - private OsgiManager osgiManager; - - public final Throwable constructionStackTrace = new Throwable("for construction stacktrace").fillInStackTrace(); - - private final Map<String, Object> brooklynAdditionalProperties; - - /** - * Creates a LocalManagement with default BrooklynProperties. - */ - public LocalManagementContext() { - this(BrooklynProperties.Factory.builderDefault()); - } - - public LocalManagementContext(BrooklynProperties brooklynProperties) { - this(brooklynProperties, (DataGridFactory)null); - } - - /** - * Creates a new LocalManagementContext. - * - * @param brooklynProperties the BrooklynProperties. - * @param datagridFactory the DataGridFactory to use. If this instance is null, it means that the system - * is going to use BrooklynProperties to figure out which instance to load or otherwise - * use a default instance. - */ - @VisibleForTesting - public LocalManagementContext(BrooklynProperties brooklynProperties, DataGridFactory datagridFactory) { - this(Builder.fromProperties(brooklynProperties), datagridFactory); - } - - public LocalManagementContext(Builder builder) { - this(builder, null, null); - } - - public LocalManagementContext(Builder builder, DataGridFactory datagridFactory) { - this(builder, null, datagridFactory); - } - - public LocalManagementContext(Builder builder, Map<String, Object> brooklynAdditionalProperties) { - this(builder, brooklynAdditionalProperties, null); - } - - public LocalManagementContext(BrooklynProperties brooklynProperties, Map<String, Object> brooklynAdditionalProperties) { - this(Builder.fromProperties(brooklynProperties), brooklynAdditionalProperties, null); - } - - public LocalManagementContext(Builder builder, Map<String, Object> brooklynAdditionalProperties, DataGridFactory datagridFactory) { - super(builder.build(), datagridFactory); - - checkNotNull(configMap, "brooklynProperties"); - - // TODO in a persisted world the planeId may be injected - this.managementPlaneId = Strings.makeRandomId(8); - this.managementNodeId = Strings.makeRandomId(8); - this.builder = builder; - this.brooklynAdditionalProperties = brooklynAdditionalProperties; - if (brooklynAdditionalProperties != null) - configMap.addFromMap(brooklynAdditionalProperties); - - BrooklynFeatureEnablement.init(configMap); - - this.locationManager = new LocalLocationManager(this); - this.accessManager = new LocalAccessManager(); - this.usageManager = new LocalUsageManager(this); - - if (configMap.getConfig(OsgiManager.USE_OSGI)) { - this.osgiManager = new OsgiManager(this); - osgiManager.start(); - } - - INSTANCES.add(this); - log.debug("Created management context "+this); - } - - @Override - public String getManagementPlaneId() { - return managementPlaneId; - } - - @Override - public String getManagementNodeId() { - return managementNodeId; - } - - @Override - public void prePreManage(Entity entity) { - getEntityManager().prePreManage(entity); - } - - @Override - public void prePreManage(Location location) { - getLocationManager().prePreManage(location); - } - - @Override - public synchronized Collection<Application> getApplications() { - return getEntityManager().getApplications(); - } - - @Override - public void addEntitySetListener(CollectionChangeListener<Entity> listener) { - getEntityManager().addEntitySetListener(listener); - } - - @Override - public void removeEntitySetListener(CollectionChangeListener<Entity> listener) { - getEntityManager().removeEntitySetListener(listener); - } - - @Override - protected void manageIfNecessary(Entity entity, Object context) { - getEntityManager().manageIfNecessary(entity, context); - } - - @Override - public synchronized LocalEntityManager getEntityManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - - if (entityManager == null) { - entityManager = new LocalEntityManager(this); - } - return entityManager; - } - - @Override - public InternalEntityFactory getEntityFactory() { - return getEntityManager().getEntityFactory(); - } - - @Override - public InternalLocationFactory getLocationFactory() { - return getLocationManager().getLocationFactory(); - } - - @Override - public InternalPolicyFactory getPolicyFactory() { - return getEntityManager().getPolicyFactory(); - } - - @Override - public synchronized LocalLocationManager getLocationManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return locationManager; - } - - @Override - public synchronized LocalAccessManager getAccessManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return accessManager; - } - - @Override - public synchronized LocalUsageManager getUsageManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - return usageManager; - } - - @Override - public synchronized Maybe<OsgiManager> getOsgiManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - if (osgiManager==null) return Maybe.absent("OSGi not available in this instance"); - return Maybe.of(osgiManager); - } - - @Override - public synchronized AccessController getAccessController() { - return getAccessManager().getAccessController(); - } - - @Override - public synchronized SubscriptionManager getSubscriptionManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - - if (subscriptions == null) { - subscriptions = new LocalSubscriptionManager(getExecutionManager()); - } - return subscriptions; - } - - @Override - public synchronized ExecutionManager getExecutionManager() { - if (!isRunning()) throw new IllegalStateException("Management context no longer running"); - - if (execution == null) { - execution = new BasicExecutionManager(getManagementNodeId()); - gc = new BrooklynGarbageCollector(configMap, execution, getStorage()); - } - return execution; - } - - @Override - public void terminate() { - synchronized (terminated) { - if (terminated.getAndSet(true)) { - log.trace("Already terminated management context "+this); - // no harm in doing it twice, but it makes logs ugly! - return; - } - log.debug("Terminating management context "+this); - - INSTANCES.remove(this); - super.terminate(); - if (osgiManager!=null) { - osgiManager.stop(); - osgiManager = null; - } - if (usageManager != null) usageManager.terminate(); - if (execution != null) execution.shutdownNow(); - if (gc != null) gc.shutdownNow(); - - log.debug("Terminated management context "+this); - } - } - - @Override - protected void finalize() { - terminate(); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public <T> Task<T> runAtEntity(Map flags, Entity entity, Callable<T> c) { - manageIfNecessary(entity, elvis(Arrays.asList(flags.get("displayName"), flags.get("description"), flags, c))); - return runAtEntity(entity, Tasks.<T>builder().dynamic(true).body(c).flags(flags).build()); - } - - protected <T> Task<T> runAtEntity(Entity entity, TaskAdaptable<T> task) { - getExecutionContext(entity).submit(task); - if (DynamicTasks.getTaskQueuingContext()!=null) { - // put it in the queueing context so it appears in the GUI - // mark it inessential as this is being invoked from code, - // the caller will do 'get' to handle errors - TaskTags.markInessential(task); - DynamicTasks.getTaskQueuingContext().queue(task.asTask()); - } - return task.asTask(); - } - - @Override - protected <T> Task<T> runAtEntity(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters) { - manageIfNecessary(entity, eff); - // prefer to submit this from the current execution context so it sets up correct cross-context chaining - ExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); - if (ec == null) { - log.debug("Top-level effector invocation: {} on {}", eff, entity); - ec = getExecutionContext(entity); - } - return runAtEntity(entity, Effectors.invocation(entity, eff, parameters)); - } - - @Override - public boolean isManagedLocally(Entity e) { - return true; - } - - @Override - public String toString() { - return LocalManagementContext.class.getSimpleName()+"["+getManagementPlaneId()+"-"+getManagementNodeId()+"]"; - } - - @Override - public void reloadBrooklynProperties() { - log.info("Reloading brooklyn properties from " + builder); - if (builder.hasDelegateOriginalProperties()) - log.warn("When reloading, mgmt context "+this+" properties are fixed, so reload will be of limited utility"); - - BrooklynProperties properties = builder.build(); - configMap = new DeferredBrooklynProperties(properties, this); - if (brooklynAdditionalProperties != null) { - log.info("Reloading additional brooklyn properties from " + brooklynAdditionalProperties); - configMap.addFromMap(brooklynAdditionalProperties); - } - this.downloadsManager = BasicDownloadsManager.newDefault(configMap); - this.entitlementManager = Entitlements.newManager(this, configMap); - - clearLocationRegistry(); - - BrooklynFeatureEnablement.init(configMap); - - // Notify listeners that properties have been reloaded - for (PropertiesReloadListener listener : reloadListeners) { - listener.reloaded(); - } - } - - @VisibleForTesting - public void clearLocationRegistry() { - // Force reload of location registry - this.locationRegistry = null; - } - - @Override - public void addPropertiesReloadListener(PropertiesReloadListener listener) { - reloadListeners.add(checkNotNull(listener, "listener")); - } - - @Override - public void removePropertiesReloadListener(PropertiesReloadListener listener) { - reloadListeners.remove(listener); - } - - public void noteStartupComplete() { - startupComplete = true; - } -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java deleted file mode 100644 index 7743995..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis; -import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; -import static org.apache.brooklyn.util.JavaGroovyEquivalents.join; -import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf; - -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.mgmt.ExecutionManager; -import org.apache.brooklyn.api.mgmt.SubscriptionHandle; -import org.apache.brooklyn.api.mgmt.SubscriptionManager; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.api.sensor.Sensor; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.sensor.BasicSensorEvent; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.task.BasicExecutionManager; -import org.apache.brooklyn.util.core.task.SingleThreadedScheduler; -import org.apache.brooklyn.util.text.Identifiers; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Predicate; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimaps; - -/** - * A {@link SubscriptionManager} that stores subscription details locally. - */ -public class LocalSubscriptionManager extends AbstractSubscriptionManager { - - private static final Logger LOG = LoggerFactory.getLogger(LocalSubscriptionManager.class); - - protected final ExecutionManager em; - - private final String tostring = "SubscriptionContext("+Identifiers.getBase64IdFromValue(System.identityHashCode(this), 5)+")"; - - private final AtomicLong totalEventsPublishedCount = new AtomicLong(); - private final AtomicLong totalEventsDeliveredCount = new AtomicLong(); - - @SuppressWarnings("rawtypes") - protected final ConcurrentMap<String, Subscription> allSubscriptions = new ConcurrentHashMap<String, Subscription>(); - @SuppressWarnings("rawtypes") - protected final ConcurrentMap<Object, Set<Subscription>> subscriptionsBySubscriber = new ConcurrentHashMap<Object, Set<Subscription>>(); - @SuppressWarnings("rawtypes") - protected final ConcurrentMap<Object, Set<Subscription>> subscriptionsByToken = new ConcurrentHashMap<Object, Set<Subscription>>(); - - public LocalSubscriptionManager(ExecutionManager m) { - this.em = m; - } - - public long getNumSubscriptions() { - return allSubscriptions.size(); - } - - public long getTotalEventsPublished() { - return totalEventsPublishedCount.get(); - } - - public long getTotalEventsDelivered() { - return totalEventsDeliveredCount.get(); - } - - @SuppressWarnings("unchecked") - protected synchronized <T> SubscriptionHandle subscribe(Map<String, Object> flags, final Subscription<T> s) { - Entity producer = s.producer; - Sensor<T> sensor= s.sensor; - s.subscriber = getSubscriber(flags, s); - if (flags.containsKey("subscriberExecutionManagerTag")) { - s.subscriberExecutionManagerTag = flags.remove("subscriberExecutionManagerTag"); - s.subscriberExecutionManagerTagSupplied = true; - } else { - s.subscriberExecutionManagerTag = - s.subscriber instanceof Entity ? "subscription-delivery-entity-"+((Entity)s.subscriber).getId()+"["+s.subscriber+"]" : - s.subscriber instanceof String ? "subscription-delivery-string["+s.subscriber+"]" : - "subscription-delivery-object["+s.subscriber+"]"; - s.subscriberExecutionManagerTagSupplied = false; - } - s.eventFilter = (Predicate<SensorEvent<T>>) flags.remove("eventFilter"); - boolean notifyOfInitialValue = Boolean.TRUE.equals(flags.remove("notifyOfInitialValue")); - s.flags = flags; - - if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[] {s.id, s.subscriber, producer, sensor, this}); - allSubscriptions.put(s.id, s); - addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s); - if (s.subscriber!=null) { - addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s); - } - if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) { - ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class); - } - - if (notifyOfInitialValue) { - if (producer == null) { - LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer: "+s); - } else if (sensor == null) { - LOG.warn("Cannot notifyOfInitialValue for subscription with wilcard sensor: "+s); - } else if (!(sensor instanceof AttributeSensor)) { - LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s); - } else { - if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s}); - Map<String, Object> tagsMap = MutableMap.of("tag", s.subscriberExecutionManagerTag); - em.submit(tagsMap, new Runnable() { - @Override - public String toString() { - return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")"; - } - public void run() { - Object val = s.producer.getAttribute((AttributeSensor<?>) s.sensor); - @SuppressWarnings("rawtypes") // TODO s.listener.onEvent gives compilation error if try to use <T> - SensorEvent event = new BasicSensorEvent(s.sensor, s.producer, val); - if (s.eventFilter!=null && !s.eventFilter.apply(event)) - return; - try { - s.listener.onEvent(event); - } catch (Throwable t) { - if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource())) { - LOG.debug("Error processing initial-value subscription to "+LocalSubscriptionManager.this+", after entity unmanaged: "+t, t); - } else { - LOG.warn("Error processing initial-value subscription to "+LocalSubscriptionManager.this+": "+t, t); - } - } - }}); - } - } - - return s; - } - - @SuppressWarnings("unchecked") - public Set<SubscriptionHandle> getSubscriptionsForSubscriber(Object subscriber) { - return (Set<SubscriptionHandle>) ((Set<?>) elvis(subscriptionsBySubscriber.get(subscriber), Collections.emptySet())); - } - - public synchronized Set<SubscriptionHandle> getSubscriptionsForEntitySensor(Entity source, Sensor<?> sensor) { - Set<SubscriptionHandle> subscriptions = new LinkedHashSet<SubscriptionHandle>(); - subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(source, sensor)), Collections.emptySet())); - subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(null, sensor)), Collections.emptySet())); - subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(source, null)), Collections.emptySet())); - subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(null, null)), Collections.emptySet())); - return subscriptions; - } - - /** - * Unsubscribe the given subscription id. - * - * @see #subscribe(Map, Entity, Sensor, SensorEventListener) - */ - @SuppressWarnings("rawtypes") - public synchronized boolean unsubscribe(SubscriptionHandle sh) { - if (!(sh instanceof Subscription)) throw new IllegalArgumentException("Only subscription handles of type Subscription supported: sh="+sh+"; type="+(sh != null ? sh.getClass().getCanonicalName() : null)); - Subscription s = (Subscription) sh; - boolean result = allSubscriptions.remove(s.id) != null; - boolean b2 = removeFromMapOfCollections(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor), s); - assert result==b2; - if (s.subscriber!=null) { - boolean b3 = removeFromMapOfCollections(subscriptionsBySubscriber, s.subscriber, s); - assert b3 == b2; - } - - // FIXME ALEX - this seems wrong - ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class); - return result; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public <T> void publish(final SensorEvent<T> event) { - // REVIEW 1459 - execution - - // delivery in parallel/background, using execution manager - - // subscriptions, should define SingleThreadedScheduler for any subscriber ID tag - // in order to ensure callbacks are invoked in the order they are submitted - // (recommend exactly one per subscription to prevent deadlock) - // this is done with: - // em.setTaskSchedulerForTag(subscriberId, SingleThreadedScheduler.class); - - //note, generating the notifications must be done in the calling thread to preserve order - //e.g. emit(A); emit(B); should cause onEvent(A); onEvent(B) in that order - if (LOG.isTraceEnabled()) LOG.trace("{} got event {}", this, event); - totalEventsPublishedCount.incrementAndGet(); - - Set<Subscription> subs = (Set<Subscription>) ((Set<?>) getSubscriptionsForEntitySensor(event.getSource(), event.getSensor())); - if (groovyTruth(subs)) { - if (LOG.isTraceEnabled()) LOG.trace("sending {}, {} to {}", new Object[] {event.getSensor().getName(), event, join(subs, ",")}); - for (Subscription s : subs) { - if (s.eventFilter!=null && !s.eventFilter.apply(event)) - continue; - final Subscription sAtClosureCreation = s; - -// Set<Object> tags = MutableSet.of(); -// if (s.subscriberExecutionManagerTag!=null) tags.add(s.subscriberExecutionManagerTag); -// if (event.getSource()!=null) tags.add(BrooklynTaskTags.tagForContextEntity(event.getSource())); -// Map<String, Object> tagsMap = mapOf("tags", (Object)tags); - // use code above, instead of line below, if we want subscription deliveries associated with the entity; - // that will cause them to be cancelled when the entity is unmanaged - // (not sure that is useful, and likely NOT worth the expense, but it might be...) -Alex Oct 2014 - Map<String, Object> tagsMap = mapOf("tag", s.subscriberExecutionManagerTag); - - em.submit(tagsMap, new Runnable() { - @Override - public String toString() { - return "LSM.publish("+event+")"; - } - public void run() { - try { - sAtClosureCreation.listener.onEvent(event); - } catch (Throwable t) { - if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource())) { - LOG.debug("Error processing subscriptions to "+this+", after entity unmanaged: "+t, t); - } else { - LOG.warn("Error processing subscriptions to "+this+": "+t, t); - } - } - }}); - totalEventsDeliveredCount.incrementAndGet(); - } - } - } - - @Override - public String toString() { - return tostring; - } - - /** - * Copied from LanguageUtils.groovy, to remove dependency. - * - * Adds the given value to a collection in the map under the key. - * - * A collection (as {@link LinkedHashMap}) will be created if necessary, - * synchronized on map for map access/change and set for addition there - * - * @return the updated set (instance, not copy) - * - * @deprecated since 0.5; use {@link HashMultimap}, and {@link Multimaps#synchronizedSetMultimap(com.google.common.collect.SetMultimap)} - */ - @Deprecated - private static <K,V> Set<V> addToMapOfSets(Map<K,Set<V>> map, K key, V valueInCollection) { - Set<V> coll; - synchronized (map) { - coll = map.get(key); - if (coll==null) { - coll = new LinkedHashSet<V>(); - map.put(key, coll); - } - if (coll.isEmpty()) { - synchronized (coll) { - coll.add(valueInCollection); - } - //if collection was empty then add to the collection while holding the map lock, to prevent removal - return coll; - } - } - synchronized (coll) { - if (!coll.isEmpty()) { - coll.add(valueInCollection); - return coll; - } - } - //if was empty, recurse, because someone else might be removing the collection - return addToMapOfSets(map, key, valueInCollection); - } - - /** - * Copied from LanguageUtils.groovy, to remove dependency. - * - * Removes the given value from a collection in the map under the key. - * - * @return the updated set (instance, not copy) - * - * @deprecated since 0.5; use {@link ArrayListMultimap} or {@link HashMultimap}, and {@link Multimaps#synchronizedListMultimap(com.google.common.collect.ListMultimap)} etc - */ - @Deprecated - private static <K,V> boolean removeFromMapOfCollections(Map<K,? extends Collection<V>> map, K key, V valueInCollection) { - Collection<V> coll; - synchronized (map) { - coll = map.get(key); - if (coll==null) return false; - } - boolean result; - synchronized (coll) { - result = coll.remove(valueInCollection); - } - if (coll.isEmpty()) { - synchronized (map) { - synchronized (coll) { - if (coll.isEmpty()) { - //only remove from the map if no one is adding to the collection or to the map, and the collection is still in the map - if (map.get(key)==coll) { - map.remove(key); - } - } - } - } - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalUsageManager.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalUsageManager.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalUsageManager.java deleted file mode 100644 index 363009e..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalUsageManager.java +++ /dev/null @@ -1,411 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.brooklyn.api.entity.Application; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; -import org.apache.brooklyn.core.internal.storage.BrooklynStorage; -import org.apache.brooklyn.core.location.AbstractLocation; -import org.apache.brooklyn.core.location.LocationConfigKeys; -import org.apache.brooklyn.core.location.internal.LocationInternal; -import org.apache.brooklyn.core.mgmt.ManagementContextInjectable; -import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; -import org.apache.brooklyn.core.mgmt.usage.ApplicationUsage; -import org.apache.brooklyn.core.mgmt.usage.LocationUsage; -import org.apache.brooklyn.core.mgmt.usage.UsageListener; -import org.apache.brooklyn.core.mgmt.usage.UsageManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.util.core.flags.TypeCoercions; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.javalang.Reflections; -import org.apache.brooklyn.util.time.Duration; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -public class LocalUsageManager implements UsageManager { - - // TODO Threading model needs revisited. - // Synchronizes on updates to storage; but if two Brooklyn nodes were both writing to the same - // ApplicationUsage or LocationUsage record there'd be a race. That currently won't happen - // (at least for ApplicationUsage?) because the app is mastered in just one node at a time, - // and because location events are just manage/unmanage which should be happening in just - // one place at a time for a given location. - - private static final Logger log = LoggerFactory.getLogger(LocalUsageManager.class); - - private static class ApplicationMetadataImpl implements UsageListener.ApplicationMetadata { - private final Application app; - private String applicationId; - private String applicationName; - private String entityType; - private String catalogItemId; - private Map<String, String> metadata; - - ApplicationMetadataImpl(Application app) { - this.app = checkNotNull(app, "app"); - applicationId = app.getId(); - applicationName = app.getDisplayName(); - entityType = app.getEntityType().getName(); - catalogItemId = app.getCatalogItemId(); - metadata = ((EntityInternal)app).toMetadataRecord(); - } - @Override public Application getApplication() { - return app; - } - @Override public String getApplicationId() { - return applicationId; - } - @Override public String getApplicationName() { - return applicationName; - } - @Override public String getEntityType() { - return entityType; - } - @Override public String getCatalogItemId() { - return catalogItemId; - } - @Override public Map<String, String> getMetadata() { - return metadata; - } - } - - private static class LocationMetadataImpl implements UsageListener.LocationMetadata { - private final Location loc; - private String locationId; - private Map<String, String> metadata; - - LocationMetadataImpl(Location loc) { - this.loc = checkNotNull(loc, "loc"); - locationId = loc.getId(); - metadata = ((LocationInternal)loc).toMetadataRecord(); - } - @Override public Location getLocation() { - return loc; - } - @Override public String getLocationId() { - return locationId; - } - @Override public Map<String, String> getMetadata() { - return metadata; - } - } - - // Register a coercion from String->UsageListener, so that USAGE_LISTENERS defined in brooklyn.properties - // will be instantiated, given their class names. - static { - TypeCoercions.registerAdapter(String.class, UsageListener.class, new Function<String, UsageListener>() { - @Override public UsageListener apply(String input) { - // TODO Want to use classLoader = mgmt.getCatalog().getRootClassLoader(); - ClassLoader classLoader = LocalUsageManager.class.getClassLoader(); - Optional<Object> result = Reflections.invokeConstructorWithArgs(classLoader, input); - if (result.isPresent()) { - return (UsageListener) result.get(); - } else { - throw new IllegalStateException("Failed to create UsageListener from class name '"+input+"' using no-arg constructor"); - } - } - }); - } - - @VisibleForTesting - public static final String APPLICATION_USAGE_KEY = "usage-application"; - - @VisibleForTesting - public static final String LOCATION_USAGE_KEY = "usage-location"; - - private final LocalManagementContext managementContext; - - private final Object mutex = new Object(); - - private final List<UsageListener> listeners = Lists.newCopyOnWriteArrayList(); - - private final AtomicInteger listenerQueueSize = new AtomicInteger(); - - private ListeningExecutorService listenerExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("brooklyn-usagemanager-listener-%d") - .build())); - - public LocalUsageManager(LocalManagementContext managementContext) { - this.managementContext = checkNotNull(managementContext, "managementContext"); - - // TODO Once org.apache.brooklyn.core.management.internal.UsageManager.UsageListener is deleted, restore this - // to normal generics! - Collection<?> listeners = managementContext.getBrooklynProperties().getConfig(UsageManager.USAGE_LISTENERS); - if (listeners != null) { - for (Object listener : listeners) { - if (listener instanceof ManagementContextInjectable) { - ((ManagementContextInjectable)listener).setManagementContext(managementContext); - } - if (listener instanceof UsageListener) { - addUsageListener((UsageListener)listener); - } else if (listener == null) { - throw new NullPointerException("null listener in config "+UsageManager.USAGE_LISTENERS); - } else { - throw new ClassCastException("listener "+listener+" of type "+listener.getClass()+" is not of type "+UsageListener.class.getName()); - } - } - } - } - - public void terminate() { - // Wait for the listeners to finish + close the listeners - Duration timeout = managementContext.getBrooklynProperties().getConfig(UsageManager.USAGE_LISTENER_TERMINATION_TIMEOUT); - if (listenerQueueSize.get() > 0) { - log.info("Usage manager waiting for "+listenerQueueSize+" listener events for up to "+timeout); - } - List<ListenableFuture<?>> futures = Lists.newArrayList(); - for (final UsageListener listener : listeners) { - ListenableFuture<?> future = listenerExecutor.submit(new Runnable() { - public void run() { - if (listener instanceof Closeable) { - try { - ((Closeable)listener).close(); - } catch (IOException e) { - log.warn("Problem closing usage listener "+listener+" (continuing)", e); - } - } - }}); - futures.add(future); - } - try { - Futures.successfulAsList(futures).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } catch (Exception e) { - Exceptions.propagateIfFatal(e); - log.warn("Problem terminiating usage listeners (continuing)", e); - } finally { - listenerExecutor.shutdownNow(); - } - } - - private void execOnListeners(final Function<UsageListener, Void> job) { - for (final UsageListener listener : listeners) { - listenerQueueSize.incrementAndGet(); - listenerExecutor.execute(new Runnable() { - public void run() { - try { - job.apply(listener); - } catch (RuntimeException e) { - log.error("Problem notifying listener "+listener+" of "+job, e); - Exceptions.propagateIfFatal(e); - } finally { - listenerQueueSize.decrementAndGet(); - } - }}); - } - } - - @Override - public void recordApplicationEvent(final Application app, final Lifecycle state) { - log.debug("Storing application lifecycle usage event: application {} in state {}", new Object[] {app, state}); - ConcurrentMap<String, ApplicationUsage> eventMap = managementContext.getStorage().getMap(APPLICATION_USAGE_KEY); - synchronized (mutex) { - ApplicationUsage usage = eventMap.get(app.getId()); - if (usage == null) { - usage = new ApplicationUsage(app.getId(), app.getDisplayName(), app.getEntityType().getName(), ((EntityInternal)app).toMetadataRecord()); - } - final ApplicationUsage.ApplicationEvent event = new ApplicationUsage.ApplicationEvent(state, getUser()); - usage.addEvent(event); - eventMap.put(app.getId(), usage); - - execOnListeners(new Function<UsageListener, Void>() { - public Void apply(UsageListener listener) { - listener.onApplicationEvent(new ApplicationMetadataImpl(Entities.proxy(app)), event); - return null; - } - public String toString() { - return "applicationEvent("+app+", "+state+")"; - }}); - } - } - - /** - * Adds this location event to the usage record for the given location (creating the usage - * record if one does not already exist). - */ - @Override - public void recordLocationEvent(final Location loc, final Lifecycle state) { - // TODO This approach (i.e. recording events on manage/unmanage would not work for - // locations that are reused. For example, in a FixedListMachineProvisioningLocation - // the ssh machine location is returned to the pool and handed back out again. - // But maybe the solution there is to hand out different instances so that one user - // can't change the config of the SshMachineLocation to subsequently affect the next - // user. - // - // TODO Should perhaps extract the location storage methods into their own class, - // but no strong enough feelings yet... - - checkNotNull(loc, "location"); - if (loc.getConfig(AbstractLocation.TEMPORARY_LOCATION)) { - log.info("Ignoring location lifecycle usage event for {} (state {}), because location is a temporary location", loc, state); - return; - } - checkNotNull(state, "state of location %s", loc); - if (loc.getId() == null) { - log.error("Ignoring location lifecycle usage event for {} (state {}), because location has no id", loc, state); - return; - } - if (managementContext.getStorage() == null) { - log.warn("Cannot store location lifecycle usage event for {} (state {}), because storage not available", loc, state); - return; - } - - Object callerContext = loc.getConfig(LocationConfigKeys.CALLER_CONTEXT); - - if (callerContext != null && callerContext instanceof Entity) { - log.debug("Storing location lifecycle usage event: location {} in state {}; caller context {}", new Object[] {loc, state, callerContext}); - - Entity caller = (Entity) callerContext; - String entityTypeName = caller.getEntityType().getName(); - String appId = caller.getApplicationId(); - - final LocationUsage.LocationEvent event = new LocationUsage.LocationEvent(state, caller.getId(), entityTypeName, appId, getUser()); - - ConcurrentMap<String, LocationUsage> usageMap = managementContext.getStorage().<String, LocationUsage>getMap(LOCATION_USAGE_KEY); - synchronized (mutex) { - LocationUsage usage = usageMap.get(loc.getId()); - if (usage == null) { - usage = new LocationUsage(loc.getId(), ((LocationInternal)loc).toMetadataRecord()); - } - usage.addEvent(event); - usageMap.put(loc.getId(), usage); - - execOnListeners(new Function<UsageListener, Void>() { - public Void apply(UsageListener listener) { - listener.onLocationEvent(new LocationMetadataImpl(loc), event); - return null; - } - public String toString() { - return "locationEvent("+loc+", "+state+")"; - }}); - } - } else { - // normal for high-level locations - log.trace("Not recording location lifecycle usage event for {} in state {}, because no caller context", new Object[] {loc, state}); - } - } - - /** - * Returns the usage info for the location with the given id, or null if unknown. - */ - @Override - public LocationUsage getLocationUsage(String locationId) { - BrooklynStorage storage = managementContext.getStorage(); - - Map<String, LocationUsage> usageMap = storage.getMap(LOCATION_USAGE_KEY); - return usageMap.get(locationId); - } - - /** - * Returns the usage info that matches the given predicate. - * For example, could be used to find locations used within a given time period. - */ - @Override - public Set<LocationUsage> getLocationUsage(Predicate<? super LocationUsage> filter) { - // TODO could do more efficient indexing, to more easily find locations in use during a given period. - // But this is good enough for first-pass. - - Map<String, LocationUsage> usageMap = managementContext.getStorage().getMap(LOCATION_USAGE_KEY); - Set<LocationUsage> result = Sets.newLinkedHashSet(); - - for (LocationUsage usage : usageMap.values()) { - if (filter.apply(usage)) { - result.add(usage); - } - } - return result; - } - - /** - * Returns the usage info for the location with the given id, or null if unknown. - */ - @Override - public ApplicationUsage getApplicationUsage(String appId) { - BrooklynStorage storage = managementContext.getStorage(); - - Map<String, ApplicationUsage> usageMap = storage.getMap(APPLICATION_USAGE_KEY); - return usageMap.get(appId); - } - - /** - * Returns the usage info that matches the given predicate. - * For example, could be used to find applications used within a given time period. - */ - @Override - public Set<ApplicationUsage> getApplicationUsage(Predicate<? super ApplicationUsage> filter) { - // TODO could do more efficient indexing, to more easily find locations in use during a given period. - // But this is good enough for first-pass. - - Map<String, ApplicationUsage> usageMap = managementContext.getStorage().getMap(APPLICATION_USAGE_KEY); - Set<ApplicationUsage> result = Sets.newLinkedHashSet(); - - for (ApplicationUsage usage : usageMap.values()) { - if (filter.apply(usage)) { - result.add(usage); - } - } - return result; - } - - @Override - public void addUsageListener(UsageListener listener) { - listeners.add(listener); - } - - @Override - public void removeUsageListener(UsageListener listener) { - listeners.remove(listener); - } - - private String getUser() { - EntitlementContext entitlementContext = Entitlements.getEntitlementContext(); - if (entitlementContext != null) { - return entitlementContext.user(); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocationManagerInternal.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocationManagerInternal.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocationManagerInternal.java deleted file mode 100644 index d37c115..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocationManagerInternal.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.mgmt.LocationManager; - -public interface LocationManagerInternal extends LocationManager, BrooklynObjectManagerInternal<Location> { - - public Iterable<String> getLocationIds(); - -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementContextInternal.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementContextInternal.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementContextInternal.java deleted file mode 100644 index e76f2fb..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementContextInternal.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import java.net.URI; -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import org.apache.brooklyn.api.effector.Effector; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.core.catalog.internal.CatalogInitialization; -import org.apache.brooklyn.core.internal.BrooklynProperties; -import org.apache.brooklyn.core.internal.storage.BrooklynStorage; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.core.mgmt.ha.OsgiManager; -import org.apache.brooklyn.core.mgmt.usage.UsageManager; -import org.apache.brooklyn.core.objs.proxy.InternalEntityFactory; -import org.apache.brooklyn.core.objs.proxy.InternalLocationFactory; -import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory; -import org.apache.brooklyn.util.core.task.TaskTags; -import org.apache.brooklyn.util.guava.Maybe; - -import com.google.common.annotations.Beta; - -public interface ManagementContextInternal extends ManagementContext { - - public static final String SUB_TASK_TAG = TaskTags.SUB_TASK_TAG; - - public static final String EFFECTOR_TAG = BrooklynTaskTags.EFFECTOR_TAG; - public static final String NON_TRANSIENT_TASK_TAG = BrooklynTaskTags.NON_TRANSIENT_TASK_TAG; - public static final String TRANSIENT_TASK_TAG = BrooklynTaskTags.TRANSIENT_TASK_TAG; - - public static final String EMPTY_CATALOG_URL = "classpath://brooklyn/empty.catalog.bom"; - - ClassLoader getBaseClassLoader(); - - Iterable<URL> getBaseClassPathForScanning(); - - void setBaseClassPathForScanning(Iterable<URL> urls); - - void setManagementNodeUri(URI uri); - - void addEntitySetListener(CollectionChangeListener<Entity> listener); - - void removeEntitySetListener(CollectionChangeListener<Entity> listener); - - void terminate(); - - long getTotalEffectorInvocations(); - - <T> T invokeEffectorMethodSync(final Entity entity, final Effector<T> eff, final Object args) throws ExecutionException; - - <T> Task<T> invokeEffector(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters); - - BrooklynStorage getStorage(); - - BrooklynProperties getBrooklynProperties(); - - AccessManager getAccessManager(); - - UsageManager getUsageManager(); - - /** - * @return The OSGi manager, if available; may be absent if OSGi is not supported, - * e.g. in test contexts (but will be supported in all major contexts). - */ - Maybe<OsgiManager> getOsgiManager(); - - InternalEntityFactory getEntityFactory(); - - InternalLocationFactory getLocationFactory(); - - InternalPolicyFactory getPolicyFactory(); - - /** - * Registers an entity that has been created, but that has not yet begun to be managed. - * <p> - * This differs from the idea of "preManaged" where the entities are in the process of being - * managed, but where management is not yet complete. - */ - // TODO would benefit from better naming! The name has percolated up from LocalEntityManager. - // should we just rename here as register or preManage? - void prePreManage(Entity entity); - - /** - * Registers a location that has been created, but that has not yet begun to be managed. - */ - void prePreManage(Location location); - - /** Object which allows adding, removing, and clearing errors. - * TODO In future this will change to a custom interface with a unique identifier for each error. */ - @Beta - List<Throwable> errors(); - - @Beta - CatalogInitialization getCatalogInitialization(); - - @Beta - void setCatalogInitialization(CatalogInitialization catalogInitialization); - - @Beta - ExternalConfigSupplierRegistry getExternalConfigProviderRegistry(); - -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementTransitionInfo.java ---------------------------------------------------------------------- diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementTransitionInfo.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementTransitionInfo.java deleted file mode 100644 index 8fa567a..0000000 --- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/ManagementTransitionInfo.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.core.mgmt.internal; - -import org.apache.brooklyn.api.mgmt.ManagementContext; - -/** Stores a management transition mode, and the management context. */ -// TODO does this class really pull its weight? -public class ManagementTransitionInfo { - - final ManagementContext mgmtContext; - final ManagementTransitionMode mode; - - public ManagementTransitionInfo(ManagementContext mgmtContext, ManagementTransitionMode mode) { - this.mgmtContext = mgmtContext; - this.mode = mode; - } - - - public ManagementContext getManagementContext() { - return mgmtContext; - } - - public ManagementTransitionMode getMode() { - return mode; - } - - @Override - public String toString() { - return super.toString()+"["+mgmtContext+";"+mode+"]"; - } -}
