Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java?rev=1822826&r1=1822825&r2=1822826&view=diff ============================================================================== --- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java (original) +++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java Wed Jan 31 20:10:03 2018 @@ -52,410 +52,407 @@ import org.osgi.framework.InvalidSyntaxE * it picks one up from the service registry then it shuts the internal one down. This doesn't fully meet * the spec for a SchedueledExecutorService. It does not properly implement shutdownNow, but this isn't used * by blueprint so for now that should be fine. - * + * <p> * <p>It also wraps the Runnables and Callables so when a task is canceled we quickly clean up memory rather - * than waiting for the target to get to the task and purge it. + * than waiting for the target to get to the task and purge it. * </p> */ -public class ScheduledExecutorServiceWrapper implements ScheduledExecutorService, SingleServiceListener -{ - public static interface ScheduledExecutorServiceFactory - { - public ScheduledExecutorService create(String name); - } - - private final AtomicReference<ScheduledExecutorService> _current = new AtomicReference<ScheduledExecutorService>(); - private SingleServiceTracker<ScheduledExecutorService> _tracked; - private final AtomicReference<ScheduledExecutorService> _default = new AtomicReference<ScheduledExecutorService>(); - private final AtomicBoolean _shutdown = new AtomicBoolean(); - private final Queue<Discardable<Runnable>> _unprocessedWork = new LinkedBlockingQueue<Discardable<Runnable>>(); - private final RWLock _lock = new RWLock(); - private final AtomicInteger _invokeEntryCount = new AtomicInteger(); - private final ScheduledExecutorServiceFactory _factory; - private final String _name; - - public ScheduledExecutorServiceWrapper(BundleContext context, String name, ScheduledExecutorServiceFactory sesf) - { - _name = name; - _factory = sesf; - try { - _tracked = new SingleServiceTracker<ScheduledExecutorService>(context, ScheduledExecutorService.class, "(aries.blueprint.poolName=" + _name + ")", this); - _tracked.open(); - } catch (InvalidSyntaxException e) { - // Just ignore and stick with the default one. - } - - if (_current.get() == null) { - _default.set(_factory.create(name)); - if (!!!_current.compareAndSet(null, _default.get())) { - _default.getAndSet(null).shutdown(); - } - } - } - - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException - { - long timeLeftToWait = unit.toMillis(timeout); - long pausePeriod = timeLeftToWait; - if (pausePeriod > 1000) pausePeriod = 1000; - while (!!!_unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0 && timeLeftToWait > 0) { - Thread.sleep(pausePeriod); - timeLeftToWait -= pausePeriod; - if (timeLeftToWait < pausePeriod) pausePeriod = timeLeftToWait; - } - return _unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0; - } - - public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) - throws InterruptedException - { - try { - return runUnlessShutdown(new Callable<List<Future<T>>>() { - - public List<Future<T>> call() throws Exception - { - _invokeEntryCount.incrementAndGet(); - try { - return _current.get().invokeAll(tasks); - } finally { - _invokeEntryCount.decrementAndGet(); - } - } - - }); - } catch (InterruptedException e) { throw e; - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, - final long timeout, - final TimeUnit unit) throws InterruptedException - { - try { - return runUnlessShutdown(new Callable<List<Future<T>>>() { - - public List<Future<T>> call() throws Exception - { - _invokeEntryCount.incrementAndGet(); - try { - return _current.get().invokeAll(tasks, timeout, unit); - } finally { - _invokeEntryCount.decrementAndGet(); - } - } - - }); - } catch (InterruptedException e) { throw e; - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, - ExecutionException - { - try { - return runUnlessShutdown(new Callable<T>() { - - public T call() throws Exception - { - _invokeEntryCount.incrementAndGet(); - try { - return _current.get().invokeAny(tasks); - } finally { - _invokeEntryCount.decrementAndGet(); - } - } - - }); - } catch (InterruptedException e) { throw e; - } catch (ExecutionException e) { throw e; - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException - { - try { - return runUnlessShutdown(new Callable<T>() { - - public T call() throws Exception - { - _invokeEntryCount.incrementAndGet(); - try { - return _current.get().invokeAny(tasks, timeout, unit); - } finally { - _invokeEntryCount.decrementAndGet(); - } - } - - }); - } catch (InterruptedException e) { throw e; - } catch (ExecutionException e) { throw e; - } catch (TimeoutException e) { throw e; - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - public boolean isShutdown() - { - return _shutdown.get(); - } - - public boolean isTerminated() - { - if (isShutdown()) return _unprocessedWork.isEmpty(); - else return false; - } - - public void shutdown() - { - _lock.runWriteOperation(new Runnable() { - - public void run() - { - _shutdown.set(true); +public class ScheduledExecutorServiceWrapper implements ScheduledExecutorService, SingleServiceListener { + public static interface ScheduledExecutorServiceFactory { + public ScheduledExecutorService create(String name); + } + + private final AtomicReference<ScheduledExecutorService> _current = new AtomicReference<ScheduledExecutorService>(); + private SingleServiceTracker<ScheduledExecutorService> _tracked; + private final AtomicReference<ScheduledExecutorService> _default = new AtomicReference<ScheduledExecutorService>(); + private final AtomicBoolean _shutdown = new AtomicBoolean(); + private final Queue<Discardable<Runnable>> _unprocessedWork = new LinkedBlockingQueue<Discardable<Runnable>>(); + private final RWLock _lock = new RWLock(); + private final AtomicInteger _invokeEntryCount = new AtomicInteger(); + private final ScheduledExecutorServiceFactory _factory; + private final String _name; + + public ScheduledExecutorServiceWrapper(BundleContext context, String name, ScheduledExecutorServiceFactory sesf) { + _name = name; + _factory = sesf; + try { + _tracked = new SingleServiceTracker<ScheduledExecutorService>(context, ScheduledExecutorService.class, "(aries.blueprint.poolName=" + _name + ")", this); + _tracked.open(); + } catch (InvalidSyntaxException e) { + // Just ignore and stick with the default one. + } + + if (_current.get() == null) { + _default.set(_factory.create(name)); + if (!!!_current.compareAndSet(null, _default.get())) { + _default.getAndSet(null).shutdown(); + } + } + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long timeLeftToWait = unit.toMillis(timeout); + long pausePeriod = timeLeftToWait; + if (pausePeriod > 1000) pausePeriod = 1000; + while (!!!_unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0 && timeLeftToWait > 0) { + Thread.sleep(pausePeriod); + timeLeftToWait -= pausePeriod; + if (timeLeftToWait < pausePeriod) pausePeriod = timeLeftToWait; + } + return _unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0; + } + + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) + throws InterruptedException { + try { + return runUnlessShutdown(new Callable<List<Future<T>>>() { + + public List<Future<T>> call() throws Exception { + _invokeEntryCount.incrementAndGet(); + try { + return _current.get().invokeAll(tasks); + } finally { + _invokeEntryCount.decrementAndGet(); + } + } + + }); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, + final long timeout, + final TimeUnit unit) throws InterruptedException { + try { + return runUnlessShutdown(new Callable<List<Future<T>>>() { + + public List<Future<T>> call() throws Exception { + _invokeEntryCount.incrementAndGet(); + try { + return _current.get().invokeAll(tasks, timeout, unit); + } finally { + _invokeEntryCount.decrementAndGet(); + } + } + + }); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, + ExecutionException { + try { + return runUnlessShutdown(new Callable<T>() { + + public T call() throws Exception { + _invokeEntryCount.incrementAndGet(); + try { + return _current.get().invokeAny(tasks); + } finally { + _invokeEntryCount.decrementAndGet(); + } + } + + }); + } catch (InterruptedException e) { + throw e; + } catch (ExecutionException e) { + throw e; + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + try { + return runUnlessShutdown(new Callable<T>() { + + public T call() throws Exception { + _invokeEntryCount.incrementAndGet(); + try { + return _current.get().invokeAny(tasks, timeout, unit); + } finally { + _invokeEntryCount.decrementAndGet(); + } + } + + }); + } catch (InterruptedException e) { + throw e; + } catch (ExecutionException e) { + throw e; + } catch (TimeoutException e) { + throw e; + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + public boolean isShutdown() { + return _shutdown.get(); + } + + public boolean isTerminated() { + if (isShutdown()) return _unprocessedWork.isEmpty(); + else return false; + } + + public void shutdown() { + _lock.runWriteOperation(new Runnable() { + + public void run() { + _shutdown.set(true); + ScheduledExecutorService s = _default.get(); + + if (s != null) s.shutdown(); + } + }); + } + + public List<Runnable> shutdownNow() { + try { + return _lock.runWriteOperation(new Callable<List<Runnable>>() { + + public List<Runnable> call() { + _shutdown.set(true); + + ScheduledExecutorService s = _default.get(); + + if (s != null) s.shutdownNow(); + + List<Runnable> runnables = new ArrayList<Runnable>(); + + for (Discardable<Runnable> r : _unprocessedWork) { + Runnable newRunnable = r.discard(); + if (newRunnable != null) { + runnables.add(newRunnable); + } + } + + return runnables; + } + + }); + } catch (Exception e) { + // This wont happen since our callable doesn't throw any exceptions, so we just return an empty list + return Collections.emptyList(); + } + } + + public <T> Future<T> submit(final Callable<T> task) { + try { + return runUnlessShutdown(new Callable<Future<T>>() { + + public Future<T> call() throws Exception { + DiscardableCallable<T> t = new DiscardableCallable<T>(task, _unprocessedWork); + try { + return new WrappedFuture<T>(_current.get().submit((Callable<T>) t), t); + } catch (RuntimeException e) { + t.discard(); + throw e; + } + } + + }); + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public Future<?> submit(final Runnable task) { + try { + return runUnlessShutdown(new Callable<Future<?>>() { + + public Future<?> call() { + DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork); + try { + return new WrappedFuture(_current.get().submit(t), t); + } catch (RuntimeException e) { + t.discard(); + throw e; + } + } + }); + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + public <T> Future<T> submit(final Runnable task, final T result) { + try { + return runUnlessShutdown(new Callable<Future<T>>() { + + public Future<T> call() { + DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork); + try { + return new WrappedFuture<T>(_current.get().submit(t, result), t); + } catch (RuntimeException e) { + t.discard(); + throw e; + } + } + }); + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + public void execute(final Runnable command) { + try { + runUnlessShutdown(new Callable<Object>() { + + public Object call() { + DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork); + try { + _current.get().execute(t); + } catch (RuntimeException e) { + t.discard(); + throw e; + } + return null; + } + }); + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { + try { + return runUnlessShutdown(new Callable<ScheduledFuture<?>>() { + + public ScheduledFuture<?> call() { + DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork); + try { + return new WrappedScheduledFuture(_current.get().schedule(t, delay, unit), t); + } catch (RuntimeException e) { + t.discard(); + throw e; + } + } + }); + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) { + try { + return runUnlessShutdown(new Callable<ScheduledFuture<V>>() { + + public ScheduledFuture<V> call() { + DiscardableCallable<V> c = new DiscardableCallable<V>(callable, _unprocessedWork); + try { + return new WrappedScheduledFuture<V>(_current.get().schedule((Callable<V>) c, delay, unit), c); + } catch (RuntimeException e) { + c.discard(); + throw e; + } + } + }); + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, + final TimeUnit unit) { + try { + return runUnlessShutdown(new Callable<ScheduledFuture<?>>() { + + public ScheduledFuture<?> call() { + DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork); + try { + return new WrappedScheduledFuture(_current.get().scheduleAtFixedRate(t, initialDelay, period, unit), t); + } catch (RuntimeException e) { + t.discard(); + throw e; + } + } + }); + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, + final TimeUnit unit) { + try { + return runUnlessShutdown(new Callable<ScheduledFuture<?>>() { + + public ScheduledFuture<?> call() { + DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork); + try { + return new WrappedScheduledFuture(_current.get().scheduleWithFixedDelay(t, initialDelay, delay, unit), t); + } catch (RuntimeException e) { + t.discard(); + throw e; + } + } + }); + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } + + public void serviceFound() { ScheduledExecutorService s = _default.get(); - - if (s != null) s.shutdown(); - } - }); - } - - public List<Runnable> shutdownNow() - { - try { - return _lock.runWriteOperation(new Callable<List<Runnable>>() { - - public List<Runnable> call() - { - _shutdown.set(true); - - ScheduledExecutorService s = _default.get(); - - if (s != null) s.shutdownNow(); - - List<Runnable> runnables = new ArrayList<Runnable>(); - - for (Discardable<Runnable> r : _unprocessedWork) { - Runnable newRunnable = r.discard(); - if (newRunnable != null) { - runnables.add(newRunnable); + if (_current.compareAndSet(s, _tracked.getService())) { + if (s != null) { + if (_default.compareAndSet(s, null)) { + s.shutdown(); + } } - } - - return runnables; - } - - }); - } catch (Exception e) { - // This wont happen since our callable doesn't throw any exceptions, so we just return an empty list - return Collections.emptyList(); - } - } - - public <T> Future<T> submit(final Callable<T> task) - { - try { - return runUnlessShutdown(new Callable<Future<T>>() { - - public Future<T> call() throws Exception - { - DiscardableCallable<T> t = new DiscardableCallable<T>(task, _unprocessedWork); - try { - return new WrappedFuture<T>(_current.get().submit((Callable<T>)t), t) ; - } catch (RuntimeException e) { - t.discard(); - throw e; - } - } - - }); - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public Future<?> submit(final Runnable task) - { - try { - return runUnlessShutdown(new Callable<Future<?>>() { - - public Future<?> call() - { - DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork); - try { - return new WrappedFuture(_current.get().submit(t), t); - } catch (RuntimeException e) { - t.discard(); - throw e; - } - } - }); - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - public <T> Future<T> submit(final Runnable task, final T result) - { - try { - return runUnlessShutdown(new Callable<Future<T>>() { - - public Future<T> call() - { - DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork); - try { - return new WrappedFuture<T>(_current.get().submit(t, result), t); - } catch (RuntimeException e) { - t.discard(); - throw e; - } - } - }); - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - public void execute(final Runnable command) - { - try { - runUnlessShutdown(new Callable<Object>() { - - public Object call() - { - DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork); - try { - _current.get().execute(t); - } catch (RuntimeException e) { - t.discard(); - throw e; - } - return null; - } - }); - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) - { - try { - return runUnlessShutdown(new Callable<ScheduledFuture<?>>() { - - public ScheduledFuture<?> call() - { - DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork); - try { - return new WrappedScheduledFuture(_current.get().schedule(t, delay, unit), t); - } catch (RuntimeException e) { - t.discard(); - throw e; - } - } - }); - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) - { - try { - return runUnlessShutdown(new Callable<ScheduledFuture<V>>() { - - public ScheduledFuture<V> call() - { - DiscardableCallable<V> c = new DiscardableCallable<V>(callable, _unprocessedWork); - try { - return new WrappedScheduledFuture<V>(_current.get().schedule((Callable<V>)c, delay, unit), c); - } catch (RuntimeException e) { - c.discard(); - throw e; - } - } - }); - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, - final TimeUnit unit) - { - try { - return runUnlessShutdown(new Callable<ScheduledFuture<?>>() { - - public ScheduledFuture<?> call() - { - DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork); - try { - return new WrappedScheduledFuture(_current.get().scheduleAtFixedRate(t, initialDelay, period, unit), t); - } catch (RuntimeException e) { - t.discard(); - throw e; - } - } - }); - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, - final TimeUnit unit) - { - try { - return runUnlessShutdown(new Callable<ScheduledFuture<?>>() { - - public ScheduledFuture<?> call() - { - DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork); - try { - return new WrappedScheduledFuture(_current.get().scheduleWithFixedDelay(t, initialDelay, delay, unit), t); - } catch (RuntimeException e) { - t.discard(); - throw e; - } - } - }); - } catch (Exception e) { throw new RejectedExecutionException(); } - } - - public void serviceFound() - { - ScheduledExecutorService s = _default.get(); - if (_current.compareAndSet(s, _tracked.getService())) { - if (s != null) { - if (_default.compareAndSet(s, null)) { - s.shutdown(); - } - } - } - } - - // TODO when lost or replaced we need to move work to the "new" _current. This is a huge change because the futures are not currently stored. - public void serviceLost() - { - ScheduledExecutorService s = _default.get(); - - if (s == null) { - s = _factory.create(_name); - if (_default.compareAndSet(null, s)) { - _current.set(s); - } - } - } - - public void serviceReplaced() - { - _current.set(_tracked.getService()); - } - - private <T> T runUnlessShutdown(final Callable<T> call) throws InterruptedException, ExecutionException, TimeoutException - { - try { - return _lock.runReadOperation(new Callable<T>() - { - public T call() throws Exception - { - if (isShutdown()) throw new RejectedExecutionException(); - return call.call(); + } + } + + // TODO when lost or replaced we need to move work to the "new" _current. This is a huge change because the futures are not currently stored. + public void serviceLost() { + ScheduledExecutorService s = _default.get(); + + if (s == null) { + s = _factory.create(_name); + if (_default.compareAndSet(null, s)) { + _current.set(s); } - }); - } catch (InterruptedException e) { throw e; - } catch (ExecutionException e) { throw e; - } catch (TimeoutException e) { throw e; - } catch (RuntimeException e) { throw e; - } catch (Exception e) { throw new RejectedExecutionException(); } - } + } + } + + public void serviceReplaced() { + _current.set(_tracked.getService()); + } + + private <T> T runUnlessShutdown(final Callable<T> call) throws InterruptedException, ExecutionException, TimeoutException { + try { + return _lock.runReadOperation(new Callable<T>() { + public T call() throws Exception { + if (isShutdown()) throw new RejectedExecutionException(); + return call.call(); + } + }); + } catch (InterruptedException e) { + throw e; + } catch (ExecutionException e) { + throw e; + } catch (TimeoutException e) { + throw e; + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RejectedExecutionException(); + } + } } \ No newline at end of file
Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java?rev=1822826&r1=1822825&r2=1822826&view=diff ============================================================================== --- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java (original) +++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java Wed Jan 31 20:10:03 2018 @@ -19,5 +19,5 @@ package org.apache.aries.blueprint.utils.threading.impl; public interface Discardable<T> { - public <T> T discard(); + public <T> T discard(); } \ No newline at end of file Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java?rev=1822826&r1=1822825&r2=1822826&view=diff ============================================================================== --- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java (original) +++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java Wed Jan 31 20:10:03 2018 @@ -25,44 +25,39 @@ import java.util.concurrent.Cancellation import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; -public class DiscardableCallable<V> implements Callable<V>, Runnable, Discardable<Runnable> -{ - private AtomicReference<Callable<V>> c = new AtomicReference<Callable<V>>(); - private Queue<Discardable<Runnable>> _removeFromListOnCall; - - public DiscardableCallable(Callable<V> call, Queue<Discardable<Runnable>> _unprocessedWork) { - c.set(call); - _removeFromListOnCall = _unprocessedWork; - _removeFromListOnCall.add(this); - } - - private DiscardableCallable(Callable<V> call) - { - c.set(call); - _removeFromListOnCall = new LinkedBlockingQueue<Discardable<Runnable>>(); - } - - public Runnable discard() - { - _removeFromListOnCall.remove(this); - return new DiscardableCallable<V>(c.getAndSet(null)) ; - } - - public V call() throws Exception - { - _removeFromListOnCall.remove(this); - Callable<V> call = c.get(); - if (call != null) { - return call.call(); +public class DiscardableCallable<V> implements Callable<V>, Runnable, Discardable<Runnable> { + private AtomicReference<Callable<V>> c = new AtomicReference<Callable<V>>(); + private Queue<Discardable<Runnable>> _removeFromListOnCall; + + public DiscardableCallable(Callable<V> call, Queue<Discardable<Runnable>> _unprocessedWork) { + c.set(call); + _removeFromListOnCall = _unprocessedWork; + _removeFromListOnCall.add(this); + } + + private DiscardableCallable(Callable<V> call) { + c.set(call); + _removeFromListOnCall = new LinkedBlockingQueue<Discardable<Runnable>>(); + } + + public Runnable discard() { + _removeFromListOnCall.remove(this); + return new DiscardableCallable<V>(c.getAndSet(null)); + } + + public V call() throws Exception { + _removeFromListOnCall.remove(this); + Callable<V> call = c.get(); + if (call != null) { + return call.call(); + } + throw new CancellationException(); } - throw new CancellationException(); - } - public void run() - { - try { - call(); - } catch (Exception e) { + public void run() { + try { + call(); + } catch (Exception e) { + } } - } } \ No newline at end of file Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java?rev=1822826&r1=1822825&r2=1822826&view=diff ============================================================================== --- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java (original) +++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java Wed Jan 31 20:10:03 2018 @@ -23,35 +23,31 @@ import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; -public class DiscardableRunnable implements Runnable, Discardable<Runnable> -{ - private AtomicReference<Runnable> r = new AtomicReference<Runnable>(); - private Queue<Discardable<Runnable>> _removeFromListOnRun; - - public DiscardableRunnable(Runnable run, Queue<Discardable<Runnable>> _unprocessedWork) { - r.set(run); - _removeFromListOnRun = _unprocessedWork; - _removeFromListOnRun.add(this); - } +public class DiscardableRunnable implements Runnable, Discardable<Runnable> { + private AtomicReference<Runnable> r = new AtomicReference<Runnable>(); + private Queue<Discardable<Runnable>> _removeFromListOnRun; - private DiscardableRunnable(Runnable run) - { - r.set(run); - _removeFromListOnRun = new LinkedBlockingQueue<Discardable<Runnable>>(); - } + public DiscardableRunnable(Runnable run, Queue<Discardable<Runnable>> _unprocessedWork) { + r.set(run); + _removeFromListOnRun = _unprocessedWork; + _removeFromListOnRun.add(this); + } + + private DiscardableRunnable(Runnable run) { + r.set(run); + _removeFromListOnRun = new LinkedBlockingQueue<Discardable<Runnable>>(); + } - public void run() - { - _removeFromListOnRun.remove(this); - Runnable run = r.get(); - if (run != null) { - run.run(); + public void run() { + _removeFromListOnRun.remove(this); + Runnable run = r.get(); + if (run != null) { + run.run(); + } } - } - public Runnable discard() - { - _removeFromListOnRun.remove(this); - return new DiscardableRunnable(r.getAndSet(null)); - } + public Runnable discard() { + _removeFromListOnRun.remove(this); + return new DiscardableRunnable(r.getAndSet(null)); + } } \ No newline at end of file Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java?rev=1822826&r1=1822825&r2=1822826&view=diff ============================================================================== --- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java (original) +++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java Wed Jan 31 20:10:03 2018 @@ -23,43 +23,37 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class WrappedFuture<T> implements Future<T> -{ - private Discardable<?> _discardable; - private Future<T> _future; - - public WrappedFuture(Future<T> f, Discardable<?> d) { - _future = f; - _discardable = d; - } - - public boolean cancel(boolean arg0) - { - boolean result = _future.cancel(arg0); - - if (result) _discardable.discard(); - - return result; - } - - public T get() throws InterruptedException, ExecutionException - { - return _future.get(); - } - - public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException, - TimeoutException - { - return _future.get(timeout, timeunit); - } - - public boolean isCancelled() - { - return _future.isCancelled(); - } - - public boolean isDone() - { - return _future.isDone(); - } +public class WrappedFuture<T> implements Future<T> { + private Discardable<?> _discardable; + private Future<T> _future; + + public WrappedFuture(Future<T> f, Discardable<?> d) { + _future = f; + _discardable = d; + } + + public boolean cancel(boolean arg0) { + boolean result = _future.cancel(arg0); + + if (result) _discardable.discard(); + + return result; + } + + public T get() throws InterruptedException, ExecutionException { + return _future.get(); + } + + public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException, + TimeoutException { + return _future.get(timeout, timeunit); + } + + public boolean isCancelled() { + return _future.isCancelled(); + } + + public boolean isDone() { + return _future.isDone(); + } } \ No newline at end of file Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java?rev=1822826&r1=1822825&r2=1822826&view=diff ============================================================================== --- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java (original) +++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java Wed Jan 31 20:10:03 2018 @@ -25,53 +25,45 @@ import java.util.concurrent.ScheduledFut import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class WrappedScheduledFuture<T> implements ScheduledFuture<T> -{ - private Discardable<?> _discardable; - private ScheduledFuture<T> _future; - - public WrappedScheduledFuture(ScheduledFuture<T> f, Discardable<?> d) { - _future = f; - _discardable = d; - } - - public long getDelay(TimeUnit timeunit) - { - return _future.getDelay(timeunit); - } - - public int compareTo(Delayed other) - { - return _future.compareTo(other); - } - - public boolean cancel(boolean arg0) - { - boolean result = _future.cancel(arg0); - - if (result) _discardable.discard(); - - return result; - } - - public T get() throws InterruptedException, ExecutionException - { - return _future.get(); - } - - public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException, - TimeoutException - { - return _future.get(timeout, timeunit); - } - - public boolean isCancelled() - { - return _future.isCancelled(); - } - - public boolean isDone() - { - return _future.isDone(); - } +public class WrappedScheduledFuture<T> implements ScheduledFuture<T> { + private Discardable<?> _discardable; + private ScheduledFuture<T> _future; + + public WrappedScheduledFuture(ScheduledFuture<T> f, Discardable<?> d) { + _future = f; + _discardable = d; + } + + public long getDelay(TimeUnit timeunit) { + return _future.getDelay(timeunit); + } + + public int compareTo(Delayed other) { + return _future.compareTo(other); + } + + public boolean cancel(boolean arg0) { + boolean result = _future.cancel(arg0); + + if (result) _discardable.discard(); + + return result; + } + + public T get() throws InterruptedException, ExecutionException { + return _future.get(); + } + + public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException, + TimeoutException { + return _future.get(timeout, timeunit); + } + + public boolean isCancelled() { + return _future.isCancelled(); + } + + public boolean isDone() { + return _future.isDone(); + } } \ No newline at end of file Modified: aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java?rev=1822826&r1=1822825&r2=1822826&view=diff ============================================================================== --- aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java (original) +++ aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java Wed Jan 31 20:10:03 2018 @@ -37,30 +37,37 @@ import org.xml.sax.SAXException; public abstract class AbstractBlueprintTest extends TestCase { protected ComponentDefinitionRegistryImpl parse(String name) throws Exception { - NamespaceHandlerSet handlers = new NamespaceHandlerSet() { + NamespaceHandlerSet handlers = new NamespaceHandlerSet() { public Set<URI> getNamespaces() { return null; } + public NamespaceHandler getNamespaceHandler(URI namespace) { if (ExtNamespaceHandler.isExtNamespace(namespace.toString())) { - return new ExtNamespaceHandler(); + return new ExtNamespaceHandler(); } else { - return null; + return null; } } + public void removeListener(NamespaceHandlerSet.Listener listener) { } + public Schema getSchema() throws SAXException, IOException { return null; } + public Schema getSchema(Map<String, String> locations) throws SAXException, IOException { - return null; + return null; } + public boolean isComplete() { return false; } + public void addListener(NamespaceHandlerSet.Listener listener) { } + public void destroy() { } };
