Author: not
Date: Tue Jul 26 17:56:04 2011
New Revision: 1151188
URL: http://svn.apache.org/viewvc?rev=1151188&view=rev
Log:
ARIES-713 Deliver a new ScheduledExecutorService class, and support, that
allows us to switch between ScheduledExecutorServices in the service registry
and default ones to allow better integration into server threading environments.
Note it is a very bad idea for the BlueprintExtender and teh
BlueprintEventDispatcher to use the same thread pool.
Added:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/RWLock.java
(with props)
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java
(with props)
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java
(with props)
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java
(with props)
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java
(with props)
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java
(with props)
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java
(with props)
Modified:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintContainerImpl.java
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintExtender.java
Modified:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintContainerImpl.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintContainerImpl.java?rev=1151188&r1=1151187&r2=1151188&view=diff
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintContainerImpl.java
(original)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintContainerImpl.java
Tue Jul 26 17:56:04 2011
@@ -41,8 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -122,7 +122,7 @@ public class BlueprintContainerImpl impl
private final List<Object> pathList;
private final ComponentDefinitionRegistryImpl componentDefinitionRegistry;
private final AggregateConverter converter;
- private final ScheduledThreadPoolExecutor executors;
+ private final ScheduledExecutorService executors;
private Set<URI> namespaces;
private State state = State.Unknown;
private NamespaceHandlerSet handlerSet;
@@ -143,7 +143,7 @@ public class BlueprintContainerImpl impl
private AccessControlContext accessControlContext;
private final IdSpace tempRecipeIdSpace = new IdSpace();
- public BlueprintContainerImpl(BundleContext bundleContext, Bundle
extenderBundle, BlueprintListener eventDispatcher, NamespaceHandlerRegistry
handlers, ScheduledThreadPoolExecutor executors, List<Object> pathList) {
+ public BlueprintContainerImpl(BundleContext bundleContext, Bundle
extenderBundle, BlueprintListener eventDispatcher, NamespaceHandlerRegistry
handlers, ScheduledExecutorService executors, List<Object> pathList) {
this.bundleContext = bundleContext;
this.extenderBundle = extenderBundle;
this.eventDispatcher = eventDispatcher;
@@ -176,7 +176,7 @@ public class BlueprintContainerImpl impl
return eventDispatcher;
}
- private void checkDirectives() {
+ private void readDirectives() {
Bundle bundle = bundleContext.getBundle();
Dictionary headers = bundle.getHeaders();
String symbolicName =
(String)headers.get(Constants.BUNDLE_SYMBOLICNAME);
@@ -246,7 +246,7 @@ public class BlueprintContainerImpl impl
LOGGER.debug("Running blueprint container for bundle {} in
state {}", bundleContext.getBundle().getSymbolicName(), state);
switch (state) {
case Unknown:
- checkDirectives();
+ readDirectives();
eventDispatcher.blueprintEvent(new
BlueprintEvent(BlueprintEvent.CREATING, getBundleContext().getBundle(),
getExtenderBundle()));
parser = new Parser();
parser.parse(getResources());
@@ -254,7 +254,6 @@ public class BlueprintContainerImpl impl
handlerSet = handlers.getNamespaceHandlers(namespaces,
getBundleContext().getBundle());
handlerSet.addListener(this);
state = State.WaitForNamespaceHandlers;
- break;
case WaitForNamespaceHandlers:
{
List<String> missing = new ArrayList<String>();
@@ -277,7 +276,6 @@ public class BlueprintContainerImpl impl
}
parser.populate(handlerSet,
componentDefinitionRegistry);
state = State.Populated;
- break;
}
case Populated:
getRepository();
@@ -296,7 +294,6 @@ public class BlueprintContainerImpl impl
};
timeoutFuture = executors.schedule(r, timeout,
TimeUnit.MILLISECONDS);
state = State.WaitForInitialReferences;
- break;
case WaitForInitialReferences:
if (waitForDependencies) {
String[] missingDependencies =
getMissingDependencies();
@@ -307,12 +304,10 @@ public class BlueprintContainerImpl impl
}
}
state = State.InitialReferencesSatisfied;
- break;
case InitialReferencesSatisfied:
processTypeConverters();
processProcessors();
state = State.WaitForInitialReferences2;
- break;
case WaitForInitialReferences2:
if (waitForDependencies) {
String[] missingDependencies =
getMissingDependencies();
@@ -323,10 +318,8 @@ public class BlueprintContainerImpl impl
}
}
state = State.Create;
- break;
case Create:
- timeoutFuture.cancel(false);
- executors.purge();
+ cancelFutureIfPresent();
registerServices();
instantiateEagerComponents();
// Register the BlueprintContainer in the OSGi registry
@@ -341,7 +334,6 @@ public class BlueprintContainerImpl impl
}
eventDispatcher.blueprintEvent(new
BlueprintEvent(BlueprintEvent.CREATED, getBundleContext().getBundle(),
getExtenderBundle()));
state = State.Created;
- break;
case Created:
case Failed:
return;
@@ -349,9 +341,7 @@ public class BlueprintContainerImpl impl
}
} catch (Throwable t) {
state = State.Failed;
- if (timeoutFuture != null) {
- timeoutFuture.cancel(false);
- }
+ cancelFutureIfPresent();
tidyupComponents();
LOGGER.error("Unable to start blueprint container for bundle " +
bundleContext.getBundle().getSymbolicName(), t);
eventDispatcher.blueprintEvent(new
BlueprintEvent(BlueprintEvent.FAILURE, getBundleContext().getBundle(),
getExtenderBundle(), t));
@@ -807,10 +797,7 @@ public class BlueprintContainerImpl impl
destroyed = true;
eventDispatcher.blueprintEvent(new
BlueprintEvent(BlueprintEvent.DESTROYING, getBundleContext().getBundle(),
getExtenderBundle()));
- if (timeoutFuture != null) {
- timeoutFuture.cancel(false);
- executors.purge();
- }
+ cancelFutureIfPresent();
AriesFrameworkUtil.safeUnregisterService(registration);
unregisterServices();
@@ -842,10 +829,7 @@ public class BlueprintContainerImpl impl
destroyed = true;
eventDispatcher.blueprintEvent(new
BlueprintEvent(BlueprintEvent.DESTROYING, getBundleContext().getBundle(),
getExtenderBundle()));
- if (timeoutFuture != null) {
- timeoutFuture.cancel(false);
- executors.purge();
- }
+ cancelFutureIfPresent();
AriesFrameworkUtil.safeUnregisterService(registration);
if (handlerSet != null) {
handlerSet.removeListener(this);
@@ -854,6 +838,13 @@ public class BlueprintContainerImpl impl
LOGGER.debug("Blueprint container quiesced: {}", this.bundleContext);
}
+ private void cancelFutureIfPresent()
+ {
+ if (timeoutFuture != null) {
+ timeoutFuture.cancel(false);
+ }
+ }
+
public void namespaceHandlerRegistered(URI uri) {
if (namespaces != null && namespaces.contains(uri)) {
schedule();
@@ -874,6 +865,4 @@ public class BlueprintContainerImpl impl
destroyComponents();
untrackServiceReferences();
}
-
}
-
Modified:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java?rev=1151188&r1=1151187&r2=1151188&view=diff
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java
(original)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintEventDispatcher.java
Tue Jul 26 17:56:04 2011
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -49,6 +50,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.aries.blueprint.utils.JavaUtils;
+import
org.apache.aries.blueprint.utils.threading.ScheduledExecutorServiceWrapper;
+import
org.apache.aries.blueprint.utils.threading.ScheduledExecutorServiceWrapper.ScheduledExecutorServiceFactory;
/**
* The delivery of {@link BlueprintEvent}s is complicated. The blueprint
extender and its containers use this class to
@@ -62,7 +65,7 @@ class BlueprintEventDispatcher implement
private final Set<BlueprintListener> listeners = new
CopyOnWriteArraySet<BlueprintListener>();
private final Map<Bundle, BlueprintEvent> states = new
ConcurrentHashMap<Bundle, BlueprintEvent>();
- private final ExecutorService executor =
Executors.newSingleThreadExecutor(new BlueprintThreadFactory("Blueprint Event
Dispatcher"));
+ private final ExecutorService executor;
private final ExecutorService sharedExecutor;
private final EventAdminListener eventAdminListener;
private final ServiceTracker containerListenerTracker;
@@ -71,7 +74,17 @@ class BlueprintEventDispatcher implement
assert bundleContext != null;
assert sharedExecutor != null;
+
+ executor = new ScheduledExecutorServiceWrapper(bundleContext,
"Blueprint Event Dispatcher", new ScheduledExecutorServiceFactory() {
+
+ public ScheduledExecutorService create(String name)
+ {
+ return Executors.newScheduledThreadPool(1, new
BlueprintThreadFactory(name));
+ }
+ });
+// executor = Executors.newSingleThreadExecutor(new
BlueprintThreadFactory("Blueprint Event Dispatcher"));
+
this.sharedExecutor = sharedExecutor;
EventAdminListener listener = null;
Modified:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintExtender.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintExtender.java?rev=1151188&r1=1151187&r2=1151188&view=diff
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintExtender.java
(original)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/container/BlueprintExtender.java
Tue Jul 26 17:56:04 2011
@@ -30,13 +30,16 @@ import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.aries.blueprint.BlueprintConstants;
import
org.apache.aries.blueprint.annotation.service.BlueprintAnnotationScanner;
import org.apache.aries.blueprint.namespace.NamespaceHandlerRegistryImpl;
import org.apache.aries.blueprint.services.ParserService;
import org.apache.aries.blueprint.utils.HeaderParser;
import org.apache.aries.blueprint.utils.HeaderParser.PathElement;
+import
org.apache.aries.blueprint.utils.threading.ScheduledExecutorServiceWrapper;
+import
org.apache.aries.blueprint.utils.threading.ScheduledExecutorServiceWrapper.ScheduledExecutorServiceFactory;
import org.apache.aries.proxy.ProxyManager;
import org.apache.aries.util.AriesFrameworkUtil;
import org.apache.aries.util.tracker.RecursiveBundleTracker;
@@ -68,7 +71,7 @@ public class BlueprintExtender implement
private static final Logger LOGGER =
LoggerFactory.getLogger(BlueprintExtender.class);
private BundleContext context;
- private ScheduledThreadPoolExecutor executors;
+ private ScheduledExecutorService executors;
private Map<Bundle, BlueprintContainerImpl> containers;
private BlueprintEventDispatcher eventDispatcher;
private NamespaceHandlerRegistry handlers;
@@ -82,7 +85,12 @@ public class BlueprintExtender implement
this.context = ctx;
handlers = new NamespaceHandlerRegistryImpl(ctx);
- executors = (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(3, new BlueprintThreadFactory("Blueprint
Extender"));
+ executors = new ScheduledExecutorServiceWrapper(ctx, "Blueprint
Extender", new ScheduledExecutorServiceFactory() {
+ public ScheduledExecutorService create(String name)
+ {
+ return Executors.newScheduledThreadPool(3, new
BlueprintThreadFactory(name));
+ }
+ });
eventDispatcher = new BlueprintEventDispatcher(ctx, executors);
containers = new HashMap<Bundle, BlueprintContainerImpl>();
Added:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/RWLock.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/RWLock.java?rev=1151188&view=auto
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/RWLock.java
(added)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/RWLock.java
Tue Jul 26 17:56:04 2011
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.blueprint.utils.threading;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+public class RWLock
+{
+ private ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
+
+ public <T> T runReadOperation(Callable<T> call) throws Exception
+ {
+ ReadLock rl = _lock.readLock();
+ rl.lock();
+ try {
+ return call.call();
+ } finally {
+ rl.unlock();
+ }
+ }
+
+ public void runReadOperation(Runnable r)
+ {
+ ReadLock rl = _lock.readLock();
+ rl.lock();
+ try {
+ r.run();
+ } finally {
+ rl.unlock();
+ }
+ }
+
+ public <T> T runWriteOperation(Callable<T> call) throws Exception
+ {
+ WriteLock wl = _lock.writeLock();
+ wl.lock();
+ try {
+ return call.call();
+ } finally {
+ wl.unlock();
+ }
+ }
+
+ public void runWriteOperation(Runnable r)
+ {
+ WriteLock wl = _lock.writeLock();
+ wl.lock();
+ try {
+ r.run();
+ } finally {
+ wl.unlock();
+ }
+ }
+}
\ No newline at end of file
Propchange:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/RWLock.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java?rev=1151188&view=auto
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java
(added)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java
Tue Jul 26 17:56:04 2011
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.blueprint.utils.threading;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.aries.blueprint.utils.threading.impl.Discardable;
+import org.apache.aries.blueprint.utils.threading.impl.DiscardableCallable;
+import org.apache.aries.blueprint.utils.threading.impl.DiscardableRunnable;
+import org.apache.aries.blueprint.utils.threading.impl.WrappedFuture;
+import org.apache.aries.blueprint.utils.threading.impl.WrappedScheduledFuture;
+import org.apache.aries.util.tracker.SingleServiceTracker;
+import
org.apache.aries.util.tracker.SingleServiceTracker.SingleServiceListener;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+
+/**
+ * This class looks like a ScheduledExecutorService to the outside world.
Internally it uses either
+ * a scheduled thread pool with a core size of 3, or it picks one up from the
service registry. If
+ * it picks one up from the service registry then it shuts the internal one
down. This doesn't fully meet
+ * the spec for a SchedueledExecutorService. It does not properly implement
shutdownNow, but this isn't used
+ * by blueprint so for now that should be fine.
+ *
+ * <p>It also wraps the Runnables and Callables so when a task is canceled we
quickly clean up memory rather
+ * than waiting for the target to get to the task and purge it.
+ * </p>
+ */
+public class ScheduledExecutorServiceWrapper implements
ScheduledExecutorService, SingleServiceListener
+{
+ public static interface ScheduledExecutorServiceFactory
+ {
+ public ScheduledExecutorService create(String name);
+ }
+
+ private final AtomicReference<ScheduledExecutorService> _current = new
AtomicReference<ScheduledExecutorService>();
+ private SingleServiceTracker<ScheduledExecutorService> _tracked;
+ private final AtomicReference<ScheduledExecutorService> _default = new
AtomicReference<ScheduledExecutorService>();
+ private final AtomicBoolean _shutdown = new AtomicBoolean();
+ private final Queue<Discardable<Runnable>> _unprocessedWork = new
LinkedBlockingQueue<Discardable<Runnable>>();
+ private final RWLock _lock = new RWLock();
+ private final AtomicInteger _invokeEntryCount = new AtomicInteger();
+ private final ScheduledExecutorServiceFactory _factory;
+ private final String _name;
+
+ public ScheduledExecutorServiceWrapper(BundleContext context, String name,
ScheduledExecutorServiceFactory sesf)
+ {
+ _name = name;
+ _factory = sesf;
+ try {
+ _tracked = new SingleServiceTracker<ScheduledExecutorService>(context,
ScheduledExecutorService.class, "(aries.blueprint.poolName=" + _name + ")",
this);
+ _tracked.open();
+ } catch (InvalidSyntaxException e) {
+ // Just ignore and stick with the default one.
+ }
+
+ if (_current.get() == null) {
+ _default.set(_factory.create(name));
+ if (!!!_current.compareAndSet(null, _default.get())) {
+ _default.getAndSet(null).shutdown();
+ }
+ }
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException
+ {
+ long timeLeftToWait = unit.toMillis(timeout);
+ long pausePeriod = timeLeftToWait;
+ if (pausePeriod > 1000) pausePeriod = 1000;
+ while (!!!_unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0 &&
timeLeftToWait > 0) {
+ Thread.sleep(pausePeriod);
+ timeLeftToWait -= pausePeriod;
+ if (timeLeftToWait < pausePeriod) pausePeriod = timeLeftToWait;
+ }
+ return _unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0;
+ }
+
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>>
tasks)
+ throws InterruptedException
+ {
+ try {
+ return runUnlessShutdown(new Callable<List<Future<T>>>() {
+
+ public List<Future<T>> call() throws Exception
+ {
+ _invokeEntryCount.incrementAndGet();
+ try {
+ return _current.get().invokeAll(tasks);
+ } finally {
+ _invokeEntryCount.decrementAndGet();
+ }
+ }
+
+ });
+ } catch (InterruptedException e) { throw e;
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>>
tasks,
+ final long timeout,
+ final TimeUnit unit) throws InterruptedException
+ {
+ try {
+ return runUnlessShutdown(new Callable<List<Future<T>>>() {
+
+ public List<Future<T>> call() throws Exception
+ {
+ _invokeEntryCount.incrementAndGet();
+ try {
+ return _current.get().invokeAll(tasks, timeout, unit);
+ } finally {
+ _invokeEntryCount.decrementAndGet();
+ }
+ }
+
+ });
+ } catch (InterruptedException e) { throw e;
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws
InterruptedException,
+ ExecutionException
+ {
+ try {
+ return runUnlessShutdown(new Callable<T>() {
+
+ public T call() throws Exception
+ {
+ _invokeEntryCount.incrementAndGet();
+ try {
+ return _current.get().invokeAny(tasks);
+ } finally {
+ _invokeEntryCount.decrementAndGet();
+ }
+ }
+
+ });
+ } catch (InterruptedException e) { throw e;
+ } catch (ExecutionException e) { throw e;
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final
long timeout, final TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException
+ {
+ try {
+ return runUnlessShutdown(new Callable<T>() {
+
+ public T call() throws Exception
+ {
+ _invokeEntryCount.incrementAndGet();
+ try {
+ return _current.get().invokeAny(tasks, timeout, unit);
+ } finally {
+ _invokeEntryCount.decrementAndGet();
+ }
+ }
+
+ });
+ } catch (InterruptedException e) { throw e;
+ } catch (ExecutionException e) { throw e;
+ } catch (TimeoutException e) { throw e;
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ public boolean isShutdown()
+ {
+ return _shutdown.get();
+ }
+
+ public boolean isTerminated()
+ {
+ if (isShutdown()) return _unprocessedWork.isEmpty();
+ else return false;
+ }
+
+ public void shutdown()
+ {
+ _lock.runWriteOperation(new Runnable() {
+
+ public void run()
+ {
+ _shutdown.set(true);
+ ScheduledExecutorService s = _default.get();
+
+ if (s != null) s.shutdown();
+ }
+ });
+ }
+
+ public List<Runnable> shutdownNow()
+ {
+ try {
+ return _lock.runWriteOperation(new Callable<List<Runnable>>() {
+
+ public List<Runnable> call()
+ {
+ _shutdown.set(true);
+
+ ScheduledExecutorService s = _default.get();
+
+ if (s != null) s.shutdownNow();
+
+ List<Runnable> runnables = new ArrayList<Runnable>();
+
+ for (Discardable<Runnable> r : _unprocessedWork) {
+ Runnable newRunnable = r.discard();
+ if (newRunnable != null) {
+ runnables.add(newRunnable);
+ }
+ }
+
+ return runnables;
+ }
+
+ });
+ } catch (Exception e) {
+ // This wont happen since our callable doesn't throw any exceptions, so
we just return an empty list
+ return Collections.emptyList();
+ }
+ }
+
+ public <T> Future<T> submit(final Callable<T> task)
+ {
+ try {
+ return runUnlessShutdown(new Callable<Future<T>>() {
+
+ public Future<T> call() throws Exception
+ {
+ DiscardableCallable<T> t = new DiscardableCallable<T>(task,
_unprocessedWork);
+ try {
+ return new WrappedFuture<T>(_current.get().submit((Callable<T>)t),
t) ;
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+
+ });
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Future<?> submit(final Runnable task)
+ {
+ try {
+ return runUnlessShutdown(new Callable<Future<?>>() {
+
+ public Future<?> call()
+ {
+ DiscardableRunnable t = new DiscardableRunnable(task,
_unprocessedWork);
+ try {
+ return new WrappedFuture(_current.get().submit(t), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ public <T> Future<T> submit(final Runnable task, final T result)
+ {
+ try {
+ return runUnlessShutdown(new Callable<Future<T>>() {
+
+ public Future<T> call()
+ {
+ DiscardableRunnable t = new DiscardableRunnable(task,
_unprocessedWork);
+ try {
+ return new WrappedFuture<T>(_current.get().submit(t, result), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ public void execute(final Runnable command)
+ {
+ try {
+ runUnlessShutdown(new Callable<Object>() {
+
+ public Object call()
+ {
+ DiscardableRunnable t = new DiscardableRunnable(command,
_unprocessedWork);
+ try {
+ _current.get().execute(t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public ScheduledFuture<?> schedule(final Runnable command, final long delay,
final TimeUnit unit)
+ {
+ try {
+ return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+ public ScheduledFuture<?> call()
+ {
+ DiscardableRunnable t = new DiscardableRunnable(command,
_unprocessedWork);
+ try {
+ return new WrappedScheduledFuture(_current.get().schedule(t,
delay, unit), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final
long delay, final TimeUnit unit)
+ {
+ try {
+ return runUnlessShutdown(new Callable<ScheduledFuture<V>>() {
+
+ public ScheduledFuture<V> call()
+ {
+ DiscardableCallable<V> c = new DiscardableCallable<V>(callable,
_unprocessedWork);
+ try {
+ return new
WrappedScheduledFuture<V>(_current.get().schedule((Callable<V>)c, delay, unit),
c);
+ } catch (RuntimeException e) {
+ c.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final
long initialDelay, final long period,
+ final TimeUnit unit)
+ {
+ try {
+ return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+ public ScheduledFuture<?> call()
+ {
+ DiscardableRunnable t = new DiscardableRunnable(command,
_unprocessedWork);
+ try {
+ return new
WrappedScheduledFuture(_current.get().scheduleAtFixedRate(t, initialDelay,
period, unit), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
final long initialDelay, final long delay,
+ final TimeUnit unit)
+ {
+ try {
+ return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+ public ScheduledFuture<?> call()
+ {
+ DiscardableRunnable t = new DiscardableRunnable(command,
_unprocessedWork);
+ try {
+ return new
WrappedScheduledFuture(_current.get().scheduleWithFixedDelay(t, initialDelay,
delay, unit), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+
+ public void serviceFound()
+ {
+ ScheduledExecutorService s = _default.get();
+ if (_current.compareAndSet(s, _tracked.getService())) {
+ if (s != null) {
+ if (_default.compareAndSet(s, null)) {
+ s.shutdown();
+ }
+ }
+ }
+ }
+
+ // TODO when lost or replaced we need to move work to the "new" _current.
This is a huge change because the futures are not currently stored.
+ public void serviceLost()
+ {
+ ScheduledExecutorService s = _default.get();
+
+ if (s == null) {
+ s = _factory.create(_name);
+ if (_default.compareAndSet(null, s)) {
+ _current.set(s);
+ }
+ }
+ }
+
+ public void serviceReplaced()
+ {
+ _current.set(_tracked.getService());
+ }
+
+ private <T> T runUnlessShutdown(final Callable<T> call) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ try {
+ return _lock.runReadOperation(new Callable<T>()
+ {
+ public T call() throws Exception
+ {
+ if (isShutdown()) throw new RejectedExecutionException();
+ return call.call();
+ }
+ });
+ } catch (InterruptedException e) { throw e;
+ } catch (ExecutionException e) { throw e;
+ } catch (TimeoutException e) { throw e;
+ } catch (RuntimeException e) { throw e;
+ } catch (Exception e) { throw new RejectedExecutionException(); }
+ }
+}
\ No newline at end of file
Propchange:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java?rev=1151188&view=auto
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java
(added)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java
Tue Jul 26 17:56:04 2011
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.blueprint.utils.threading.impl;
+
+public interface Discardable<T> {
+ public <T> T discard();
+}
\ No newline at end of file
Propchange:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java?rev=1151188&view=auto
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java
(added)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java
Tue Jul 26 17:56:04 2011
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.blueprint.utils.threading.impl;
+
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DiscardableCallable<V> implements Callable<V>, Runnable,
Discardable<Runnable>
+{
+ private AtomicReference<Callable<V>> c = new AtomicReference<Callable<V>>();
+ private Queue<Discardable<Runnable>> _removeFromListOnCall;
+
+ public DiscardableCallable(Callable<V> call, Queue<Discardable<Runnable>>
_unprocessedWork) {
+ c.set(call);
+ _removeFromListOnCall = _unprocessedWork;
+ _removeFromListOnCall.add(this);
+ }
+
+ private DiscardableCallable(Callable<V> call)
+ {
+ c.set(call);
+ _removeFromListOnCall = new LinkedBlockingQueue<Discardable<Runnable>>();
+ }
+
+ public Runnable discard()
+ {
+ _removeFromListOnCall.remove(this);
+ return new DiscardableCallable<V>(c.getAndSet(null)) ;
+ }
+
+ public V call() throws Exception
+ {
+ _removeFromListOnCall.remove(this);
+ Callable<V> call = c.get();
+ if (call != null) {
+ return call.call();
+ }
+ throw new CancellationException();
+ }
+
+ public void run()
+ {
+ try {
+ call();
+ } catch (Exception e) {
+ }
+ }
+}
\ No newline at end of file
Propchange:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java?rev=1151188&view=auto
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java
(added)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java
Tue Jul 26 17:56:04 2011
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.blueprint.utils.threading.impl;
+
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class DiscardableRunnable implements Runnable, Discardable<Runnable>
+{
+ private AtomicReference<Runnable> r = new AtomicReference<Runnable>();
+ private Queue<Discardable<Runnable>> _removeFromListOnRun;
+
+ public DiscardableRunnable(Runnable run, Queue<Discardable<Runnable>>
_unprocessedWork) {
+ r.set(run);
+ _removeFromListOnRun = _unprocessedWork;
+ _removeFromListOnRun.add(this);
+ }
+
+ private DiscardableRunnable(Runnable run)
+ {
+ r.set(run);
+ _removeFromListOnRun = new LinkedBlockingQueue<Discardable<Runnable>>();
+ }
+
+ public void run()
+ {
+ _removeFromListOnRun.remove(this);
+ Runnable run = r.get();
+ if (run != null) {
+ run.run();
+ }
+ }
+
+ public Runnable discard()
+ {
+ _removeFromListOnRun.remove(this);
+ return new DiscardableRunnable(r.getAndSet(null));
+ }
+}
\ No newline at end of file
Propchange:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java?rev=1151188&view=auto
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java
(added)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java
Tue Jul 26 17:56:04 2011
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.blueprint.utils.threading.impl;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WrappedFuture<T> implements Future<T>
+{
+ private Discardable<?> _discardable;
+ private Future<T> _future;
+
+ public WrappedFuture(Future<T> f, Discardable<?> d) {
+ _future = f;
+ _discardable = d;
+ }
+
+ public boolean cancel(boolean arg0)
+ {
+ boolean result = _future.cancel(arg0);
+
+ if (result) _discardable.discard();
+
+ return result;
+ }
+
+ public T get() throws InterruptedException, ExecutionException
+ {
+ return _future.get();
+ }
+
+ public T get(long timeout, TimeUnit timeunit) throws InterruptedException,
ExecutionException,
+ TimeoutException
+ {
+ return _future.get(timeout, timeunit);
+ }
+
+ public boolean isCancelled()
+ {
+ return _future.isCancelled();
+ }
+
+ public boolean isDone()
+ {
+ return _future.isDone();
+ }
+}
\ No newline at end of file
Propchange:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java
URL:
http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java?rev=1151188&view=auto
==============================================================================
---
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java
(added)
+++
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java
Tue Jul 26 17:56:04 2011
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIESOR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.blueprint.utils.threading.impl;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class WrappedScheduledFuture<T> implements ScheduledFuture<T>
+{
+ private Discardable<?> _discardable;
+ private ScheduledFuture<T> _future;
+
+ public WrappedScheduledFuture(ScheduledFuture<T> f, Discardable<?> d) {
+ _future = f;
+ _discardable = d;
+ }
+
+ public long getDelay(TimeUnit timeunit)
+ {
+ return _future.getDelay(timeunit);
+ }
+
+ public int compareTo(Delayed other)
+ {
+ return _future.compareTo(other);
+ }
+
+ public boolean cancel(boolean arg0)
+ {
+ boolean result = _future.cancel(arg0);
+
+ if (result) _discardable.discard();
+
+ return result;
+ }
+
+ public T get() throws InterruptedException, ExecutionException
+ {
+ return _future.get();
+ }
+
+ public T get(long timeout, TimeUnit timeunit) throws InterruptedException,
ExecutionException,
+ TimeoutException
+ {
+ return _future.get(timeout, timeunit);
+ }
+
+ public boolean isCancelled()
+ {
+ return _future.isCancelled();
+ }
+
+ public boolean isDone()
+ {
+ return _future.isDone();
+ }
+}
\ No newline at end of file
Propchange:
aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java
------------------------------------------------------------------------------
svn:mime-type = text/plain