Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java?rev=1521521&r1=1521520&r2=1521521&view=diff ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java (original) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java Tue Sep 10 15:14:46 2013 @@ -24,21 +24,26 @@ import static org.apache.ace.agent.Agent import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_STREAMING; import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_SYNCDELAY; import static org.apache.ace.agent.AgentConstants.CONFIG_CONTROLLER_SYNCINTERVAL; - +import static org.apache.ace.agent.impl.InternalConstants.AGENT_CONFIG_CHANGED; +import static org.apache.ace.agent.impl.ConnectionUtil.*; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ace.agent.DeploymentHandler; import org.apache.ace.agent.DownloadHandle; import org.apache.ace.agent.DownloadResult; import org.apache.ace.agent.DownloadState; +import org.apache.ace.agent.EventListener; import org.apache.ace.agent.FeedbackChannel; import org.apache.ace.agent.RetryAfterException; import org.osgi.framework.Version; @@ -47,45 +52,100 @@ import org.osgi.service.deploymentadmin. /** * Default configurable controller */ -public class DefaultController extends ComponentBase implements Runnable { +public class DefaultController extends ComponentBase implements Runnable, EventListener { private volatile ScheduledFuture<?> m_scheduledFuture; private volatile UpdateInstaller m_updateInstaller; + private final AtomicBoolean m_disabled; + private final AtomicBoolean m_updateStreaming; + private final AtomicBoolean m_fixPackage; + private final AtomicLong m_maxRetries; + private final AtomicLong m_interval; + private final AtomicLong m_syncDelay; + public DefaultController() { super("controller"); - } - @Override - protected void onStart() throws Exception { - long delay = getConfigurationHandler().getLong(CONFIG_CONTROLLER_SYNCDELAY, 5); - scheduleRun(delay); - logDebug("Controller scheduled to run in %d seconds", delay); + m_disabled = new AtomicBoolean(false); + m_interval = new AtomicLong(60); + m_syncDelay = new AtomicLong(5); + + m_updateStreaming = new AtomicBoolean(true); + m_fixPackage = new AtomicBoolean(true); + m_maxRetries = new AtomicLong(1); } @Override - protected void onStop() throws Exception { - if (m_updateInstaller != null) { - m_updateInstaller.reset(); + public void handle(String topic, Map<String, String> payload) { + if (AGENT_CONFIG_CHANGED.equals(topic)) { + String value = payload.get(CONFIG_CONTROLLER_DISABLED); + if (value != null && !"".equals(value)) { + m_disabled.set(Boolean.parseBoolean(value)); + } + + value = payload.get(CONFIG_CONTROLLER_STREAMING); + if (value != null && !"".equals(value)) { + m_updateStreaming.set(Boolean.parseBoolean(value)); + } + + value = payload.get(CONFIG_CONTROLLER_FIXPACKAGES); + if (value != null && !"".equals(value)) { + m_fixPackage.set(Boolean.parseBoolean(value)); + } + + value = payload.get(CONFIG_CONTROLLER_SYNCDELAY); + if (value != null && !"".equals(value)) { + try { + m_syncDelay.set(Long.parseLong(value)); + } + catch (NumberFormatException exception) { + // Ignore... + } + } + + value = payload.get(CONFIG_CONTROLLER_RETRIES); + if (value != null && !"".equals(value)) { + try { + m_maxRetries.set(Long.parseLong(value)); + } + catch (NumberFormatException exception) { + // Ignore... + } + } + + value = payload.get(CONFIG_CONTROLLER_SYNCINTERVAL); + if (value != null && !"".equals(value)) { + try { + m_interval.set(Long.parseLong(value)); + } + catch (NumberFormatException exception) { + // Ignore... + } + } + + logDebug("Config changed: disabled: %s, update: %s, fixPkg: %s, syncDelay: %d, syncInterval: %d, maxRetries: %d", m_disabled.get(), m_updateStreaming.get(), m_fixPackage.get(), m_syncDelay.get(), m_interval.get(), m_maxRetries.get()); } - unscheduleRun(); } @Override public void run() { - boolean disabled = getConfigurationHandler().getBoolean(CONFIG_CONTROLLER_DISABLED, false); - long interval = getConfigurationHandler().getLong(CONFIG_CONTROLLER_SYNCINTERVAL, 60); - if (disabled) { - logDebug("Controller disabled by configuration. Skipping.."); - scheduleRun(interval); - return; - } + boolean disabled = m_disabled.get(); + long interval = m_interval.get(); - logDebug("Controller syncing..."); try { + if (disabled) { + logDebug("Controller disabled by configuration. Skipping..."); + return; + } + + logDebug("Controller syncing..."); + runFeedback(); runAgentUpdate(); runDeploymentUpdate(); + + logDebug("Sync completed. Rescheduled in %d seconds", interval); } catch (RetryAfterException e) { // any method may throw this causing the sync to abort. The server is busy so no sense in trying @@ -98,12 +158,39 @@ public class DefaultController extends C // we can do but log it as an error and reschedule as usual. logError("Sync aborted due to Exception.", e); } - scheduleRun(interval); - logDebug("Sync completed. Rescheduled in %d seconds", interval); + finally { + scheduleRun(interval); + } + } + + @Override + protected void onInit() throws Exception { + getEventsHandler().addListener(this); + } + + @Override + protected void onStart() throws Exception { + long delay = m_syncDelay.get(); + + scheduleRun(delay); + + logDebug("Controller scheduled to run in %d seconds", delay); + } + + @Override + protected void onStop() throws Exception { + getEventsHandler().removeListener(this); + + if (m_updateInstaller != null) { + m_updateInstaller.reset(); + } + + unscheduleRun(); } private void runFeedback() throws RetryAfterException { logDebug("Synchronizing feedback channels"); + Set<String> names = getFeedbackChannelNames(); for (String name : names) { FeedbackChannel channel = getFeedbackChannel(name); @@ -128,7 +215,7 @@ public class DefaultController extends C catch (IOException e) { // Probably a serious problem due to local IO related to feedback. No cause to abort the sync so we just log // it as an error. - logError("Exception while Looking up feedback channelnames. This is "); + logError("Exception while looking up feedback channel names."); } return Collections.emptySet(); } @@ -147,6 +234,7 @@ public class DefaultController extends C private void runAgentUpdate() throws RetryAfterException { logDebug("Checking for agent update"); + Version current = getAgentUpdateHandler().getInstalledVersion(); SortedSet<Version> available = getAvailableAgentVersions(); Version highest = Version.emptyVersion; @@ -160,6 +248,7 @@ public class DefaultController extends C } logInfo("Installing agent update %s => %s", current, highest); + InputStream inputStream = null; try { inputStream = getAgentUpdateHandler().getInputStream(highest); @@ -171,6 +260,9 @@ public class DefaultController extends C // FIXME Does not cover failed updates and should handle retries logWarning("Exception while installing agent update %s", e, highest); } + finally { + closeSilently(inputStream); + } } private SortedSet<Version> getAvailableAgentVersions() throws RetryAfterException { @@ -186,8 +278,8 @@ public class DefaultController extends C } private void runDeploymentUpdate() throws RetryAfterException { - logDebug("Checking for deployment update"); + Version current = getDeploymentHandler().getInstalledVersion(); SortedSet<Version> available = getAvailableDeploymentVersions(); Version highest = Version.emptyVersion; @@ -200,9 +292,9 @@ public class DefaultController extends C return; } - boolean updateStreaming = getConfigurationHandler().getBoolean(CONFIG_CONTROLLER_STREAMING, true); - boolean fixPackage = getConfigurationHandler().getBoolean(CONFIG_CONTROLLER_FIXPACKAGES, true); - long maxRetries = getConfigurationHandler().getLong(CONFIG_CONTROLLER_RETRIES, 1); + boolean updateStreaming = m_updateStreaming.get(); + boolean fixPackage = m_fixPackage.get(); + long maxRetries = m_maxRetries.get(); getUpdateInstaller(updateStreaming).installUpdate(current, highest, fixPackage, maxRetries); } @@ -248,7 +340,7 @@ public class DefaultController extends C private void unscheduleRun() { if (m_scheduledFuture != null) - m_scheduledFuture.cancel(true); + m_scheduledFuture.cancel(false /* mayInterruptWhileRunning */); } /** @@ -256,7 +348,6 @@ public class DefaultController extends C * delegates the rest to concrete implementations. */ abstract static class UpdateInstaller { - private final DefaultController m_controller; private Version m_lastVersion = null; private int m_failureCount = 0; @@ -315,14 +406,12 @@ public class DefaultController extends C * UpdateInstaller that provides streaming deployment package install. The install is blocking. */ static class StreamingUpdateInstaller extends UpdateInstaller { - public StreamingUpdateInstaller(DefaultController controller) { super(controller); } @Override public void doInstallUpdate(Version from, Version to, boolean fix) throws RetryAfterException, DeploymentException, IOException { - getController().logInfo("Installing streaming deployment update %s => %s", from, to); DeploymentHandler deploymentHandler = getController().getDeploymentHandler(); @@ -346,6 +435,7 @@ public class DefaultController extends C @Override protected void doReset() { + // Nop } } @@ -354,7 +444,6 @@ public class DefaultController extends C * completion this installer will reschedule the controller. */ static class DownloadUpdateInstaller extends UpdateInstaller implements DownloadHandle.ProgressListener, DownloadHandle.ResultListener { - // active download state private volatile DownloadHandle m_downloadHandle; private volatile DownloadResult m_downloadResult = null; @@ -431,7 +520,7 @@ public class DefaultController extends C @Override public void completed(DownloadResult result) { m_downloadResult = result; - getController().logInfo("Deployment package donwload completed for version %s. Rescheduling the controller to run in %d seconds", m_downloadVersion, 1); + getController().logInfo("Deployment package download completed for version %s. Rescheduling the controller to run in %d seconds", m_downloadVersion, 1); getController().scheduleRun(1); }
Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DependencyTrackerImpl.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DependencyTrackerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DependencyTrackerImpl.java (original) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DependencyTrackerImpl.java Tue Sep 10 15:14:46 2013 @@ -18,10 +18,9 @@ */ package org.apache.ace.agent.impl; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; @@ -29,261 +28,278 @@ import org.osgi.framework.Filter; import org.osgi.framework.FrameworkUtil; import org.osgi.framework.ServiceReference; import org.osgi.util.tracker.ServiceTracker; -import org.osgi.util.tracker.ServiceTrackerCustomizer; /** * Simple service dependency tracker that tracks a number of required dependencies and provides life-cycle. */ public class DependencyTrackerImpl { - - interface LifecycleCallbacks { - void started(); - - void stopped(); - } - - interface DependencyCallback { + /** + * Called when an individual dependency is (no longer) satisfied. + */ + static interface DependencyCallback { + /** + * Called when a dependency is updated. + * + * @param service + * the new dependency, can be <code>null</code> in case the dependency is no longer available. + */ void updated(Object service); } - private final Set<ServiceDependency> m_dependencies = new HashSet<ServiceDependency>(); - private final BundleContext m_bundleContext; - private final LifecycleCallbacks m_callbacks; - private volatile boolean m_tracking = false; - private volatile boolean m_started = false; - - public DependencyTrackerImpl(BundleContext bundleContext, LifecycleCallbacks callbacks) { - m_bundleContext = bundleContext; - m_callbacks = callbacks; + /** + * Callback interface for reporting the state of the tracked dependencies. + */ + static interface LifecycleCallback { + /** + * Called when all dependencies are satisfied. + */ + void componentStarted(BundleContext context) throws Exception; + + /** + * Called when one or more dependencies are no longer satisfied. + */ + void componentStopped(BundleContext context) throws Exception; } - public BundleContext getBundleContext() { - return m_bundleContext; - } + /** + * Represents an actual dependency on an OSGi service. + */ + private static class ServiceDependency { + private final DependencyTrackerImpl m_manager; + private final DependencyCallback m_calback; + private final ServiceTracker m_tracker; + // the actual tracked service... + private final AtomicReference<Object> m_serviceRef; - public void addDependency(Class<?> iface, String extraFilter, DependencyCallback inject) throws Exception { - synchronized (this) { - if (m_tracking) { - throw new IllegalStateException("Can not add dependecies while tracking"); - } - } - Filter filter = null; - if (extraFilter != null) { - filter = FrameworkUtil.createFilter("(&(" + Constants.OBJECTCLASS + "=" + iface.getName() + ")" + extraFilter + ")"); - } - else { - filter = FrameworkUtil.createFilter("(" + Constants.OBJECTCLASS + "=" + iface.getName() + ")"); + public ServiceDependency(DependencyTrackerImpl manager, String filterString, DependencyCallback callback) throws Exception { + m_manager = manager; + m_calback = callback; + + m_tracker = new ServiceDependencyTracker(this, manager.getBundleContext(), FrameworkUtil.createFilter(filterString)); + m_serviceRef = new AtomicReference<Object>(); } - ServiceDependency dependency = new ServiceDependency(this, filter, inject); - m_dependencies.add(dependency); - } - public void startTracking() throws Exception { - synchronized (this) { - if (m_tracking) { - throw new IllegalStateException("Allready started tracking"); - } - m_tracking = true; + public Object getService() { + return m_serviceRef.get(); } - for (ServiceDependency dependency : m_dependencies) { - dependency.startTracking(); + + public boolean isServiceAvailable() { + return getService() != null; } - } - public void stopTracking() { - synchronized (this) { - if (!m_tracking) { - throw new IllegalStateException("Did not start tracking yet"); - } - m_tracking = false; + public void startTracking() { + m_tracker.open(); } - for (ServiceDependency dependency : m_dependencies) { - dependency.stopTracking(); + + public void stopTracking() { + m_tracker.close(); } - } - private void update() { - // As this is a simple internal implementation we assume we can safely invoke - // callbacks while holding locks. - synchronized (this) { - if (dependenciesAvailable()) { - if (m_started) { - stopCallback(); + void changed(ServiceReference ref) { + Object service = (ref == null) ? null : m_manager.getBundleContext().getService(ref); + Object oldService; + do { + oldService = m_serviceRef.get(); + } + while (!m_serviceRef.compareAndSet(oldService, service)); + + // Check on reference(!) to determine whether the service is changed... + if (oldService != service) { + if (m_calback != null) { + m_calback.updated(service); } - serviceCallbacks(); - startCallback(); - } - else { - if (m_started) { - stopCallback(); - serviceCallbacks(); - } - } - } - } - private boolean dependenciesAvailable() { - boolean available = true; - for (ServiceDependency dependency : m_dependencies) { - if (dependency.getService() == null) { - available = false; - break; + m_manager.update(); } } - return available; } - private void startCallback() { - try { - m_callbacks.started(); - m_started = true; - } - catch (Exception e) { - // really must not happen - e.printStackTrace(); + /** + * Tracker customizer that calls AgentContextDependency#changed with the highest matching service whenever something + * changes. + */ + private static class ServiceDependencyTracker extends ServiceTracker { + private final CopyOnWriteArrayList<ServiceReference> m_trackedServiceRefs; + private final ServiceDependency m_dependency; + + public ServiceDependencyTracker(ServiceDependency dependency, BundleContext context, Filter filter) { + super(context, filter, null); + m_dependency = dependency; + m_trackedServiceRefs = new CopyOnWriteArrayList<ServiceReference>(); } - } - private void stopCallback() { - try { - m_callbacks.stopped(); - m_started = false; + @Override + public Object addingService(ServiceReference reference) { + if (m_trackedServiceRefs.addIfAbsent(reference)) { + checkForUpdate(); + } + return super.addingService(reference); } - catch (Exception e) { - // really must not happen - e.printStackTrace(); + + @Override + public void modifiedService(ServiceReference reference, Object service) { + checkForUpdate(); } - } - private void serviceCallbacks() { - for (ServiceDependency dependency : m_dependencies) { - try { - dependency.invokeCallback(); + @Override + public void removedService(ServiceReference reference, Object service) { + if (m_trackedServiceRefs.remove(reference)) { + checkForUpdate(); } - catch (Exception e) { - // really must not happen - e.printStackTrace(); + } + + private void checkForUpdate() { + ServiceReference highestReference = null; + for (ServiceReference reference : m_trackedServiceRefs) { + if (highestReference == null || highestReference.compareTo(reference) < 1) { + highestReference = reference; + } } + + m_dependency.changed(highestReference); } } - private static class ServiceDependency { - - private final DependencyTrackerImpl m_manager; - private final Filter m_filter; - private final DependencyCallback m_calback; - private final ServiceTracker m_tracker; - private volatile Object m_service; + private final BundleContext m_bundleContext; + private final LifecycleCallback m_callback; + private final CopyOnWriteArrayList<ServiceDependency> m_dependencies; + private final AtomicBoolean m_tracking; + private final AtomicBoolean m_started; - public ServiceDependency(DependencyTrackerImpl manager, Filter filter, DependencyCallback callback) throws Exception { - m_manager = manager; - m_filter = filter; - m_calback = callback; - m_tracker = new ServiceDependencyTracker(this); - } + /** + * Creates a new {@link DependencyTrackerImpl} instance. + * + * @param bundleContext + * the bundle context; + * @param callback + * the component callback. + */ + public DependencyTrackerImpl(BundleContext bundleContext, LifecycleCallback callback) { + m_bundleContext = bundleContext; + m_callback = callback; - public BundleContext getBundleContext() { - return m_manager.getBundleContext(); - } + m_dependencies = new CopyOnWriteArrayList<ServiceDependency>(); + m_tracking = new AtomicBoolean(false); + m_started = new AtomicBoolean(false); + } - public Filter getFilter() { - return m_filter; + /** + * Adds a dependency to track. + * + * @param iface + * the interface of the dependency to track; + * @param extraFilter + * an optional filter for the tracked dependency; + * @param callback + * the callback to call when the dependency comes (un)available. + */ + public void addDependency(Class<?> iface, String extraFilter, DependencyCallback callback) throws Exception { + if (m_tracking.get()) { + throw new IllegalStateException("Can not add new dependency while tracking is started!"); } - public Object getService() { - return m_service; + String filter = String.format("(%s=%s)", Constants.OBJECTCLASS, iface.getName()); + if (extraFilter != null) { + filter = String.format("(&%s%s)", filter, extraFilter); } - public void startTracking() { - if (m_tracker == null) { - } - m_tracker.open(); - } + m_dependencies.addIfAbsent(new ServiceDependency(this, filter, callback)); + } - public void stopTracking() { - m_tracker.close(); - } + public BundleContext getBundleContext() { + return m_bundleContext; + } - void invokeCallback() { - if (m_calback != null) { - m_calback.updated(m_service); - } + /** + * Starts tracking all dependencies, if all dependencies are satisfied, + * {@link LifecycleCallback#componentStarted(BundleContext)} will be called. For each satisfied dependency, + * {@link DependencyCallback#updated(Object)} is called. + * + * @throws IllegalStateException + * in case this tracker is already started. + */ + public void startTracking() throws Exception { + // This method should be called once and only once... + if (!m_tracking.compareAndSet(false, true)) { + throw new IllegalStateException("Already started tracking!"); } - void changed(Object service) { - // Sync on manager to ensure all dependency updates happen in order - synchronized (m_manager) { - m_service = service; - m_manager.update(); - } + for (ServiceDependency dependency : m_dependencies) { + dependency.startTracking(); } } /** - * Custom service tracker to simply construction. + * Stops tracking of dependencies. For each tracked dependency, {@link DependencyCallback#updated(Object)} is called + * with a <code>null</code> value. * + * @throws IllegalStateException + * in case this tracker is already started. */ - private static class ServiceDependencyTracker extends ServiceTracker { - - public ServiceDependencyTracker(ServiceDependency dependency) { - super(dependency.getBundleContext(), dependency.getFilter(), new ServiceDependencyTrackerCustomizer(dependency)); + public void stopTracking() { + // This method should be called once and only once... + if (!m_tracking.compareAndSet(true, false)) { + throw new IllegalStateException("Did not start tracking yet"); + } + for (ServiceDependency dependency : m_dependencies) { + dependency.stopTracking(); } } /** - * Tracker customizer that calls AgentContextDependency#changed with the highest matching service whenever something - * changes. + * Called for each change in the dependency set. It will call + * {@link LifecycleCallback#componentStopped(BundleContext)} if needed, and + * {@link LifecycleCallback#componentStarted(BundleContext)} when all dependencies are met. */ - private static class ServiceDependencyTrackerCustomizer implements ServiceTrackerCustomizer { + public void update() { + stopComponent(); - private final Map<ServiceReference, Object> m_trackedServices = new HashMap<ServiceReference, Object>(); - private final ServiceDependency m_dependency; - private volatile Object m_service; - - public ServiceDependencyTrackerCustomizer(ServiceDependency dependency) { - m_dependency = dependency; + if (allDependenciesAvailable()) { + startComponent(); } + } - @Override - public Object addingService(ServiceReference reference) { - Object service = m_dependency.getBundleContext().getService(reference); - synchronized (m_trackedServices) { - m_trackedServices.put(reference, service); - checkForUpdate(); - return service; + /** + * @return <code>true</code> if all dependencies are available, <code>false</code> otherwise. + */ + final boolean allDependenciesAvailable() { + for (ServiceDependency dependency : m_dependencies) { + if (!dependency.isServiceAvailable()) { + return false; } } + return true; + } - @Override - public void modifiedService(ServiceReference reference, Object service) { - synchronized (m_trackedServices) { - m_trackedServices.put(reference, service); - checkForUpdate(); + /** + * Tries to start the component, if it is not already started. + */ + final void startComponent() { + // Only call our callback when we're actually starting tracking dependencies... + if (m_started.compareAndSet(false, true)) { + try { + m_callback.componentStarted(m_bundleContext); } - } - - @Override - public void removedService(ServiceReference reference, Object service) { - synchronized (m_trackedServices) { - m_trackedServices.remove(reference); - checkForUpdate(); + catch (Exception e) { + // really must not happen + e.printStackTrace(); } } + } - private void checkForUpdate() { - ServiceReference highestReference = null; - if (!m_trackedServices.isEmpty()) { - for (ServiceReference reference : m_trackedServices.keySet()) { - if (highestReference == null || highestReference.compareTo(reference) < 1) { - highestReference = reference; - } - } + /** + * Tries to stop the component, if it is not already stopped. + */ + final void stopComponent() { + // Only call our callback when we're actually started tracking dependencies... + if (m_started.compareAndSet(true, false)) { + try { + m_callback.componentStopped(m_bundleContext); } - Object service = highestReference == null ? null : m_trackedServices.get(highestReference); - if (m_service == null || m_service != service) { - m_service = service; - m_dependency.changed(service); + catch (Exception e) { + // really must not happen + e.printStackTrace(); } } } Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java (original) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DeploymentHandlerImpl.java Tue Sep 10 15:14:46 2013 @@ -18,11 +18,11 @@ */ package org.apache.ace.agent.impl; +import static org.apache.ace.agent.impl.ReflectionUtil.configureField; +import static org.apache.ace.agent.impl.ReflectionUtil.invokeMethod; + import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.AccessibleObject; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.MalformedURLException; import java.net.URL; import java.util.HashMap; @@ -82,11 +82,15 @@ public class DeploymentHandlerImpl exten @Override public Version getInstalledVersion() { Version highestVersion = Version.emptyVersion; + String identification = getIdentification(); + DeploymentPackage[] installedPackages = m_deploymentAdmin.listDeploymentPackages(); for (DeploymentPackage installedPackage : installedPackages) { - if (installedPackage.getName().equals(getIdentification()) - && installedPackage.getVersion().compareTo(highestVersion) > 0) { - highestVersion = installedPackage.getVersion(); + String packageId = installedPackage.getName(); + Version packageVersion = installedPackage.getVersion(); + + if (identification.equals(packageId) && packageVersion.compareTo(highestVersion) > 0) { + highestVersion = packageVersion; } } return highestVersion; @@ -118,8 +122,7 @@ public class DeploymentHandlerImpl exten }; private URL getPackageURL(Version version, boolean fixPackage) throws RetryAfterException, IOException { - URL url = getEndpoint(getServerURL(), getIdentification(), fixPackage ? getInstalledVersion() : Version.emptyVersion, version); - return url; + return getEndpoint(getServerURL(), getIdentification(), fixPackage ? getInstalledVersion() : Version.emptyVersion, version); } private URL getEndpoint(URL serverURL, String identification) { @@ -145,41 +148,10 @@ public class DeploymentHandlerImpl exten } } - private static void configureField(Object object, Class<?> iface, Object instance) { - // Note: Does not check super classes! - Field[] fields = object.getClass().getDeclaredFields(); - AccessibleObject.setAccessible(fields, true); - for (int j = 0; j < fields.length; j++) { - if (fields[j].getType().equals(iface)) { - try { - fields[j].set(object, instance); - } - catch (Exception e) { - e.printStackTrace(); - throw new IllegalStateException("Coudld not set field " + fields[j].getName() + " on " + object); - } - } - } - } - - private static Object invokeMethod(Object object, String methodName, Class<?>[] signature, Object[] parameters) { - // Note: Does not check super classes! - Class<?> clazz = object.getClass(); - try { - Method method = clazz.getDeclaredMethod(methodName, signature); - return method.invoke(object, parameters); - } - catch (Exception e) { - e.printStackTrace(); - } - return null; - } - /** * Internal EventAdmin that delegates to actual InternalEvents. Used to inject into the DeploymentAdmin only. */ - class EventAdminBridge implements EventAdmin { - + final class EventAdminBridge implements EventAdmin { @Override public void postEvent(Event event) { getEventsHandler().postEvent(event.getTopic(), getPayload(event)); @@ -202,8 +174,7 @@ public class DeploymentHandlerImpl exten /** * Internal LogService that wraps delegates to actual InternalLogger. Used to inject into the DeploymentAdmin only. */ - class LogServiceBridge implements LogService { - + final class LogServiceBridge implements LogService { @Override public void log(int level, String message) { log(level, message, null); @@ -237,5 +208,4 @@ public class DeploymentHandlerImpl exten log(level, message, exception); } } - } Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DiscoveryHandlerImpl.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DiscoveryHandlerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DiscoveryHandlerImpl.java (original) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DiscoveryHandlerImpl.java Tue Sep 10 15:14:46 2013 @@ -20,111 +20,203 @@ package org.apache.ace.agent.impl; import static org.apache.ace.agent.AgentConstants.CONFIG_DISCOVERY_CHECKING; import static org.apache.ace.agent.AgentConstants.CONFIG_DISCOVERY_SERVERURLS; +import static org.apache.ace.agent.impl.InternalConstants.AGENT_CONFIG_CHANGED; import java.io.IOException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ace.agent.DiscoveryHandler; +import org.apache.ace.agent.EventListener; /** * Default thread-safe {@link DiscoveryHandler} implementation that reads the serverURL(s) from the configuration using * key {@link CONFIG_DISCOVERY_SERVERURLS}. If the {@link CONFIG_DISCOVERY_CHECKING} flag is a connection is opened to * test whether a serverURL is available before it is returned. */ -public class DiscoveryHandlerImpl extends ComponentBase implements DiscoveryHandler { +public class DiscoveryHandlerImpl extends ComponentBase implements DiscoveryHandler, EventListener { - private final Map<String, CheckedURL> m_availableURLs = new HashMap<String, DiscoveryHandlerImpl.CheckedURL>(); - private final Map<String, CheckedURL> m_blacklistedURLs = new HashMap<String, DiscoveryHandlerImpl.CheckedURL>(); + private static class CheckedURL { + /** cache timeout in milliseconds. */ + private static final long CACHE_TIME = 30000; + + public final URL m_url; + private final AtomicLong m_timestamp; + private final AtomicBoolean m_blackListed; + + public CheckedURL(URL url) { + m_url = url; + m_blackListed = new AtomicBoolean(false); + m_timestamp = new AtomicLong(0L); + } - private static final long CACHE_TIME = 2000; + public void available() { + m_blackListed.set(false); + m_timestamp.set(System.currentTimeMillis()); + } + + public void blacklist() { + m_blackListed.set(true); + m_timestamp.set(System.currentTimeMillis()); + } + + public boolean isBlacklisted() { + boolean result = m_blackListed.get(); + if (result) { + if (!isRecentlyChecked()) { + // lift the ban... + m_blackListed.compareAndSet(result, false); + result = false; + } + } + return result; + } + + public boolean isRecentlyChecked() { + return m_timestamp.get() > (System.currentTimeMillis() - CACHE_TIME); + } + } + + /** default server URL. */ + private static final String DEFAULT_SERVER_URL = "http://localhost:8080"; + /** whether or not to test server URLs. */ + private static final boolean DEFAULT_CHECK_SERVER_ULRS = false; + + private final List<String> m_urls; + private final AtomicBoolean m_checkURLs; + private final ConcurrentMap<String, CheckedURL> m_availableURLs; public DiscoveryHandlerImpl() { super("discovery"); - } - @Override - protected void onStop() throws Exception { - m_availableURLs.clear(); - m_blacklistedURLs.clear(); + m_availableURLs = new ConcurrentHashMap<String, CheckedURL>(); + m_checkURLs = new AtomicBoolean(DEFAULT_CHECK_SERVER_ULRS); + m_urls = new ArrayList<String>(Arrays.asList(DEFAULT_SERVER_URL)); } - // TODO Pretty naive implementation below. It always takes the first configured URL it can connect to and is not - // thread-safe. + /** + * Returns the first available URL, based on the order specified in the configuration. + * + * @return a (valid) server URL, or <code>null</code> in case no server URL was valid. + */ @Override public URL getServerUrl() { - - String configValue = getConfigurationHandler().get(CONFIG_DISCOVERY_SERVERURLS, "http://localhost:8080"); - boolean checking = getConfigurationHandler().getBoolean(CONFIG_DISCOVERY_CHECKING, false); + String[] urls; + synchronized (m_urls) { + urls = new String[m_urls.size()]; + m_urls.toArray(urls); + } + boolean checking = m_checkURLs.get(); URL url = null; - if (configValue.indexOf(",") == -1) { - url = getURL(configValue.trim(), checking); - } - else { - for (String configValuePart : configValue.split(",")) { - url = getURL(configValuePart.trim(), checking); - if (url != null) { - break; - } + for (String urlValue : urls) { + if ((url = getURL(urlValue, checking)) != null) { + break; } } + if (url == null) { - logWarning("No connectable serverUrl available"); + logWarning("No valid server URL discovered?!"); } + return url; } - private static class CheckedURL { - URL url; - long timestamp; + @Override + public void handle(String topic, Map<String, String> payload) { + if (AGENT_CONFIG_CHANGED.equals(topic)) { + String value = payload.get(CONFIG_DISCOVERY_SERVERURLS); + if (value != null && !"".equals(value.trim())) { + String[] urls = value.trim().split("\\s*,\\s*"); + + synchronized (m_urls) { + m_urls.clear(); + m_urls.addAll(Arrays.asList(urls)); + } + // Assume nothing about the newly configured URLs... + m_availableURLs.clear(); + } - public CheckedURL(URL url, long timestamp) { - this.url = url; - this.timestamp = timestamp; + value = payload.get(CONFIG_DISCOVERY_CHECKING); + if (value != null) { + boolean checkURLs = Boolean.parseBoolean(value); + // last one wins... + m_checkURLs.set(checkURLs); + } } } - private URL getURL(String serverURL, boolean checking) { + @Override + protected void onInit() throws Exception { + getEventsHandler().addListener(this); + } + + @Override + protected void onStop() throws Exception { + getEventsHandler().removeListener(this); + + m_availableURLs.clear(); + } + + private URL getURL(String serverURL, boolean checkURL) { + CheckedURL checkedURL = null; + URL result = null; - URL url = null; try { - CheckedURL blackListed = m_blacklistedURLs.get(serverURL); - if (blackListed != null && blackListed.timestamp > (System.currentTimeMillis() - CACHE_TIME)) { - logDebug("Ignoring blacklisted serverURL: " + serverURL); - return null; - } + logDebug("Start getting URL for : %s", serverURL); - url = new URL(serverURL); - if (!checking) { - return url; + checkedURL = m_availableURLs.get(serverURL); + if (checkedURL == null) { + checkedURL = new CheckedURL(new URL(serverURL)); + + CheckedURL putResult = m_availableURLs.putIfAbsent(serverURL, checkedURL); + if (putResult != null) { + // lost the put, make sure to use the correct object... + checkedURL = putResult; + } } - CheckedURL available = m_availableURLs.get(serverURL); - if (available != null && available.timestamp > (System.currentTimeMillis() - CACHE_TIME)) { - logDebug("Returning available serverURL: " + available.url.toExternalForm()); - return available.url; + if (checkedURL.isBlacklisted()) { + logDebug("Ignoring blacklisted serverURL: %s", serverURL); + // Take the short way home... + return null; } - tryConnect(url); - logDebug("Succesfully connected to serverURL: %s", serverURL); - m_availableURLs.put(serverURL, new CheckedURL(url, System.currentTimeMillis())); - return url; + result = checkedURL.m_url; + if (checkURL && !checkedURL.isRecentlyChecked()) { + logDebug("Trying to connect to serverURL: %s", serverURL); + + tryConnect(checkedURL.m_url); + // no exception was thrown trying to connect to the URL, so assume it's available... + checkedURL.available(); + + logDebug("Succesfully connected to serverURL: %s", serverURL); + } } catch (MalformedURLException e) { - logError("Temporarily blacklisting malformed serverURL: " + serverURL); - m_blacklistedURLs.put(serverURL, new CheckedURL(url, System.currentTimeMillis())); - return null; + logWarning("Ignoring invalid/malformed serverURL: %s", serverURL); + // No need to blacklist for this case, we're trying to create a CheckedURL which isn't present... + result = null; } catch (IOException e) { - logWarning("Temporarily blacklisting unavailable serverURL: " + serverURL); - m_blacklistedURLs.put(serverURL, new CheckedURL(url, System.currentTimeMillis())); - return null; + logWarning("Temporarily blacklisting unavailable serverURL: %s", serverURL); + if (checkedURL != null) { + checkedURL.blacklist(); + } + result = null; } + + return result; } private void tryConnect(URL serverURL) throws IOException { @@ -134,8 +226,9 @@ public class DiscoveryHandlerImpl extend connection.connect(); } finally { - if (connection != null && connection instanceof HttpURLConnection) + if (connection instanceof HttpURLConnection) { ((HttpURLConnection) connection).disconnect(); + } } } } Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java (original) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DownloadHandleImpl.java Tue Sep 10 15:14:46 2013 @@ -32,7 +32,6 @@ import org.apache.ace.agent.DownloadStat /** * A {@link DownloadHandle} implementation that supports pause/resume semantics based on HTTP Range headers assuming the * server supports this feature. - * */ class DownloadHandleImpl implements DownloadHandle { @@ -80,8 +79,9 @@ class DownloadHandleImpl implements Down } DownloadHandle start(int failAtPosition) { - if (m_started) - throw new IllegalStateException("Can not call start on a handle that is allready started"); + if (m_started) { + throw new IllegalStateException("Can not call start on a handle that is already started"); + } if (m_file == null) { try { m_file = File.createTempFile("download", ".bin"); @@ -96,8 +96,9 @@ class DownloadHandleImpl implements Down @Override public DownloadHandle stop() { - if (!m_started && !m_completed) + if (!m_started && !m_completed) { throw new IllegalStateException("Can not call stop on a handle that is not yet started"); + } m_started = false; stopDownload(); return this; @@ -174,22 +175,24 @@ class DownloadHandleImpl implements Down } private static void callProgressListener(ProgressListener listener, long contentLength, long progress) { - if (listener != null) + if (listener != null) { try { listener.progress(contentLength, progress); } catch (Exception e) { // ignore } + } } private static void callCompletionListener(ResultListener listener, DownloadResult result) { - if (listener != null && result != null) + if (listener != null && result != null) { try { listener.completed(result); } catch (Exception e) { // ignore } + } } } Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java (original) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java Tue Sep 10 15:14:46 2013 @@ -21,6 +21,7 @@ package org.apache.ace.agent.impl; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ace.agent.EventListener; import org.apache.ace.agent.FeedbackChannel; @@ -45,50 +46,58 @@ public class EventLoggerImpl extends Com public static final String TOPIC_COMPLETE = "org/osgi/service/deployment/COMPLETE"; private final BundleContext m_bundleContext; - private volatile boolean m_isStarted = false; + private final AtomicBoolean m_isStarted; public EventLoggerImpl(BundleContext bundleContext) { super("auditlogger"); + m_bundleContext = bundleContext; + m_isStarted = new AtomicBoolean(false); } @Override - protected void onStart() throws Exception { + protected void onInit() throws Exception { getEventsHandler().addListener(this); - m_bundleContext.addBundleListener(this); - m_bundleContext.addFrameworkListener(this); - m_isStarted = true; + } + + @Override + protected void onStart() throws Exception { + if (m_isStarted.compareAndSet(false, true)) { + m_bundleContext.addBundleListener(this); + m_bundleContext.addFrameworkListener(this); + } } @Override protected void onStop() throws Exception { - m_isStarted = false; - getEventsHandler().removeListener(this); - m_bundleContext.removeBundleListener(this); - m_bundleContext.removeFrameworkListener(this); + if (m_isStarted.compareAndSet(true, false)) { + getEventsHandler().removeListener(this); + + m_bundleContext.removeBundleListener(this); + m_bundleContext.removeFrameworkListener(this); + } } @Override public void handle(String topic, Map<String, String> payload) { - if (!m_isStarted) { + if (!m_isStarted.get()) { return; } int eventType = AuditEvent.DEPLOYMENTADMIN_BASE; Map<String, String> props = new HashMap<String, String>(); - if (topic.equals(TOPIC_INSTALL)) { + if (TOPIC_INSTALL.equals(topic)) { String deplPackName = payload.get("deploymentpackage.name"); eventType = AuditEvent.DEPLOYMENTADMIN_INSTALL; props.put(AuditEvent.KEY_NAME, deplPackName); } - - else if (topic.equals(TOPIC_UNINSTALL)) { + else if (TOPIC_UNINSTALL.equals(topic)) { String deplPackName = payload.get("deploymentpackage.name"); eventType = AuditEvent.DEPLOYMENTADMIN_UNINSTALL; props.put(AuditEvent.KEY_NAME, deplPackName); } - else if (topic.equals(TOPIC_COMPLETE)) { + else if (TOPIC_COMPLETE.equals(topic)) { eventType = AuditEvent.DEPLOYMENTADMIN_COMPLETE; props.put(AuditEvent.KEY_NAME, payload.get("deploymentpackage.name")); props.put(AuditEvent.KEY_VERSION, getDeploymentHandler().getInstalledVersion().toString()); @@ -99,7 +108,7 @@ public class EventLoggerImpl extends Com @Override public void bundleChanged(BundleEvent event) { - if (!m_isStarted) { + if (!m_isStarted.get()) { return; } @@ -155,7 +164,7 @@ public class EventLoggerImpl extends Com @Override public void frameworkEvent(FrameworkEvent event) { - if (!m_isStarted) { + if (!m_isStarted.get()) { return; } int eventType = AuditEvent.FRAMEWORK_BASE; @@ -221,10 +230,12 @@ public class EventLoggerImpl extends Com if (channel != null) { channel.write(eventType, payload); } + else { + logWarning("Feedback event *not* written as no channel is available!"); + } } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + logWarning("Failed to write feedback event!", e); } } } Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java?rev=1521521&r1=1521520&r2=1521521&view=diff ============================================================================== --- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java (original) +++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventsHandlerImpl.java Tue Sep 10 15:14:46 2013 @@ -18,15 +18,12 @@ */ package org.apache.ace.agent.impl; -import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.ace.agent.EventListener; import org.apache.ace.agent.EventsHandler; import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; -import org.osgi.framework.Filter; import org.osgi.framework.ServiceReference; import org.osgi.util.tracker.ServiceTracker; import org.osgi.util.tracker.ServiceTrackerCustomizer; @@ -37,45 +34,20 @@ import org.osgi.util.tracker.ServiceTrac * {@link #removeListener(EventListener)}. */ public class EventsHandlerImpl extends ComponentBase implements EventsHandler { - - private final List<EventListener> m_listeners = new CopyOnWriteArrayList<EventListener>(); + private final CopyOnWriteArrayList<EventListener> m_listeners = new CopyOnWriteArrayList<EventListener>(); private final BundleContext m_bundleContext; - - private ServiceTracker m_tracker; + // + private volatile ServiceTracker m_tracker; public EventsHandlerImpl(BundleContext bundleContext) throws Exception { super("events"); - m_bundleContext = bundleContext; - Filter listenerFilter = m_bundleContext.createFilter("(" + Constants.OBJECTCLASS + "=" + EventListener.class.getName() + ")"); - m_tracker = new ServiceTracker(m_bundleContext, listenerFilter, new ServiceTrackerCustomizer() { - - @Override - public Object addingService(ServiceReference reference) { - Object service = m_bundleContext.getService(reference); - addListener((EventListener) service); - return service; - } - - @Override - public void removedService(ServiceReference reference, Object service) { - removeListener((EventListener) service); - } - @Override - public void modifiedService(ServiceReference reference, Object service) { - } - }); - } - - @Override - protected void onStart() throws Exception { - m_tracker.open(); + m_bundleContext = bundleContext; } @Override - protected void onStop() throws Exception { - m_tracker.close(); - m_listeners.clear(); + public void addListener(EventListener listener) { + m_listeners.addIfAbsent(listener); } @Override @@ -96,8 +68,13 @@ public class EventsHandlerImpl extends C } @Override - public void sendEvent(final String topic, final Map<String, String> payload) { - for (final EventListener listener : m_listeners) { + public void removeListener(EventListener listener) { + m_listeners.remove(listener); + } + + @Override + public void sendEvent(String topic, Map<String, String> payload) { + for (EventListener listener : m_listeners) { try { listener.handle(topic, payload); } @@ -108,12 +85,30 @@ public class EventsHandlerImpl extends C } @Override - public void addListener(EventListener listener) { - m_listeners.add(listener); + protected void onInit() throws Exception { + m_tracker = new ServiceTracker(m_bundleContext, EventListener.class.getName(), new ServiceTrackerCustomizer() { + @Override + public Object addingService(ServiceReference reference) { + Object service = m_bundleContext.getService(reference); + addListener((EventListener) service); + return service; + } + + @Override + public void modifiedService(ServiceReference reference, Object service) { + } + + @Override + public void removedService(ServiceReference reference, Object service) { + removeListener((EventListener) service); + } + }); + m_tracker.open(); } @Override - public void removeListener(EventListener listener) { - m_listeners.remove(listener); + protected void onStop() throws Exception { + m_tracker.close(); + m_listeners.clear(); } }
