Author: peter_firmstone Date: Fri Jan 20 09:10:19 2017 New Revision: 1779587
URL: http://svn.apache.org/viewvc?rev=1779587&view=rev Log: RIVER-447 Leaked Executor Service Threads in LoadClass Commit reviewed patch. Modified: river/jtsk/trunk/src/net/jini/loader/LoadClass.java river/jtsk/trunk/src/org/apache/river/concurrent/RC.java river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java Modified: river/jtsk/trunk/src/net/jini/loader/LoadClass.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/net/jini/loader/LoadClass.java?rev=1779587&r1=1779586&r2=1779587&view=diff ============================================================================== --- river/jtsk/trunk/src/net/jini/loader/LoadClass.java (original) +++ river/jtsk/trunk/src/net/jini/loader/LoadClass.java Fri Jan 20 09:10:19 2017 @@ -20,6 +20,8 @@ package net.jini.loader; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.Collection; +import java.util.List; import org.apache.river.concurrent.RC; import org.apache.river.concurrent.Ref; import org.apache.river.concurrent.Referrer; @@ -28,10 +30,12 @@ import java.util.concurrent.ConcurrentHa import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.river.thread.NamedThreadFactory; /** @@ -97,15 +101,7 @@ public class LoadClass { } ExecutorService exec = loaderMap.get(loader); if (exec == null) { - exec = new ThreadPoolExecutor( - 1, - 1, - 0, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new NamedThreadFactory(loader.toString(), false), - new ThreadPoolExecutor.CallerRunsPolicy() - ); + exec = new AutoCloseableExecutor(loader.toString()); ExecutorService existed = loaderMap.putIfAbsent(loader, exec); if (existed != null) { exec = existed; @@ -178,5 +174,92 @@ public class LoadClass { } } + + private static class AutoCloseableExecutor implements ExecutorService, AutoCloseable { + + private final ExecutorService decorated; + + AutoCloseableExecutor(String loaderName){ + decorated = new ThreadPoolExecutor( + 1, + 1, + 0, + TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new NamedThreadFactory(loaderName, false), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Override + public void shutdown() { + decorated.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return decorated.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return decorated.isShutdown(); + } + + @Override + public boolean isTerminated() { + return decorated.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return decorated.awaitTermination(timeout, unit); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return decorated.submit(task); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return decorated.submit(task, result); + } + + @Override + public Future<?> submit(Runnable task) { + return decorated.submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return decorated.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return decorated.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { + return decorated.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return decorated.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + decorated.execute(command); + } + + @Override + public void close() throws SecurityException { + decorated.shutdown(); + } + + } } Modified: river/jtsk/trunk/src/org/apache/river/concurrent/RC.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/concurrent/RC.java?rev=1779587&r1=1779586&r2=1779587&view=diff ============================================================================== --- river/jtsk/trunk/src/org/apache/river/concurrent/RC.java (original) +++ river/jtsk/trunk/src/org/apache/river/concurrent/RC.java Fri Jan 20 09:10:19 2017 @@ -80,8 +80,9 @@ import java.util.Iterator; * removal of enqued references is performed by background Executor threads. * Your chosen encapsulated {@link Collection} must also be mutable. * Objects will be removed automatically from encapsulated Collections when - * they are eligible for garbage collection, external synchronisation of - * decorated collections is not supported. + * they are eligible for garbage collection, object's that implement AutoCloseable + * will automatically have their resources freed after removal, + * external synchronisation of decorated collections is not supported. * </p><p> * If you're using Iterators, you must synchronise on the underlying Collection * or Map, if iterating through keys or values, this doesn't apply to Modified: river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java?rev=1779587&r1=1779586&r2=1779587&view=diff ============================================================================== --- river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java (original) +++ river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java Fri Jan 20 09:10:19 2017 @@ -15,6 +15,7 @@ package org.apache.river.concurrent; +import java.io.IOException; import java.lang.ref.PhantomReference; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; @@ -190,6 +191,15 @@ class ReferenceProcessor<T> implements R try { for ( Object t = queue.poll(); t != null; t = queue.poll()){ col.remove(t); + if (t instanceof Referrer){ + Object referent = ((Referrer)t).get(); + if (referent instanceof AutoCloseable){ + try{ + // Release any resources held by the referent. + ((AutoCloseable) referent).close(); + } catch (Exception ex){} // Ignore + } + } } }catch(Exception e){ e.printStackTrace(System.err);
