Author: peter_firmstone Date: Fri Jan 3 12:01:35 2014 New Revision: 1555061
URL: http://svn.apache.org/r1555061 Log: Replaced TaskManager in Mahalo with ExecutorService RetryTask now implements RunnableFuture and can be returned from an ExecutorService for cancellation. Replaced JRMPExporter with BasicJeriExporter in the qa test suite's NonActivatibleGroupImpl Reduced logging output to FINEST in RFC3986URLClassLoader Added ExtensibleExecutorService to allow encapsulation and extension of ExecutorService's defined by Configuration Added: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ExtensibleExecutorService.java Modified: river/jtsk/skunk/qa_refactor/trunk/build.xml river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupImpl.java river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/resources/qaDefaults.properties river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/mahalo/RandomStressTest.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/AbortJob.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/CommitJob.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/Job.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/ParticipantTask.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareAndCommitJob.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareJob.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/SettlerTask.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImpl.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerTransaction.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImplInit.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java Modified: river/jtsk/skunk/qa_refactor/trunk/build.xml URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/build.xml?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/build.xml (original) +++ river/jtsk/skunk/qa_refactor/trunk/build.xml Fri Jan 3 12:01:35 2014 @@ -814,6 +814,8 @@ <arg path="${build.classes.dir}/com/sun/jini/system"/> <!--qa suite dep--> <arg path="${build.classes.dir}/net/jini"/> <arg path="${build.classes.dir}/org/apache/river/impl/lease"/> + <arg path="${build.classes.dir}/org/apache/river/impl/thread"/> + <arg path="${build.classes.dir}/org/apache/river/impl"/> <arg value="com.sun.jini.logging.Levels"/> <arg value="com.sun.jini.phoenix.ActivationAdmin"/> <arg value="com.sun.jini.start.LifeCycle"/> Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupImpl.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/harness/NonActivatableGroupImpl.java Fri Jan 3 12:01:35 2014 @@ -34,6 +34,10 @@ import net.jini.config.Configuration; import net.jini.config.ConfigurationException; import net.jini.config.ConfigurationProvider; import net.jini.export.Exporter; +import net.jini.jeri.BasicILFactory; +import net.jini.jeri.BasicJeriExporter; +import net.jini.jeri.InvocationLayerFactory; +import net.jini.jeri.tcp.TcpServerEndpoint; import net.jini.jrmp.JrmpExporter; import org.apache.river.api.security.CombinerSecurityManager; @@ -104,7 +108,7 @@ class NonActivatableGroupImpl { * at construction time using a <code>JrmpExporter</code>. */ public GroupImpl() { - this (new JrmpExporter()); + this (new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory())); export(); } Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/resources/qaDefaults.properties URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/resources/qaDefaults.properties?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/resources/qaDefaults.properties (original) +++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/qa/resources/qaDefaults.properties Fri Jan 3 12:01:35 2014 @@ -271,8 +271,7 @@ altClasspath=<harnessJar>$:${com.sun.jin # Arguments are comma separated. To embed a comma in # an argument, use "+,". # -testjvmargs=-client,\ -${testjvmargs} +testjvmargs=${testjvmargs} # # defined for historical reasons. Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/mahalo/RandomStressTest.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/mahalo/RandomStressTest.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/mahalo/RandomStressTest.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/mahalo/RandomStressTest.java Fri Jan 3 12:01:35 2014 @@ -220,7 +220,7 @@ public class RandomStressTest extends Tx if (alltasks[i] == null) { continue; } - boolean done = alltasks[i].complete(); + boolean done = alltasks[i].isDone(); if (DEBUG) { logger.log(Level.INFO, Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/AbortJob.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/AbortJob.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/AbortJob.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/AbortJob.java Fri Jan 3 12:01:35 2014 @@ -18,7 +18,6 @@ package com.sun.jini.mahalo; import com.sun.jini.mahalo.log.ClientLog; -import com.sun.jini.thread.TaskManager; import com.sun.jini.thread.WakeupManager; import java.rmi.activation.ActivateFailedException; @@ -30,6 +29,7 @@ import java.rmi.ConnectIOException; import java.rmi.AccessException; import java.rmi.ConnectException; import java.util.Iterator; +import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -81,7 +81,7 @@ class AbortJob extends Job implements Tr * @see com.sun.jini.mahalo.log.ClientLog * @see net.jini.core.transaction.server.TransactionParticipant */ - public AbortJob(Transaction tr, TaskManager pool, + public AbortJob(Transaction tr, ExecutorService pool, WakeupManager wm, ClientLog log, ParticipantHandle[] handles) { super(pool, wm); Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/CommitJob.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/CommitJob.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/CommitJob.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/CommitJob.java Fri Jan 3 12:01:35 2014 @@ -30,6 +30,7 @@ import java.rmi.ConnectIOException; import java.rmi.AccessException; import java.rmi.ConnectException; import java.util.Iterator; +import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -78,7 +79,7 @@ class CommitJob extends Job implements T * @see com.sun.jini.mahalo.log.ClientLog * @see net.jini.core.transaction.server.TransactionParticipant */ - public CommitJob(Transaction tr, TaskManager pool, + public CommitJob(Transaction tr, ExecutorService pool, WakeupManager wm, ClientLog log, ParticipantHandle[] handles) { super(pool, wm); Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/Job.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/Job.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/Job.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/Job.java Fri Jan 3 12:01:35 2014 @@ -17,11 +17,13 @@ */ package com.sun.jini.mahalo; -import com.sun.jini.thread.TaskManager; import com.sun.jini.thread.WakeupManager; +import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.logging.Level; @@ -36,31 +38,30 @@ import java.util.logging.Logger; * */ abstract class Job { - private final TaskManager pool; + private final ExecutorService pool; private final WakeupManager wm; private final AtomicInteger pend; final ConcurrentMap<Integer,Object> results; volatile AtomicIntegerArray attempts = null; - private final ConcurrentMap<Object,Integer> tasks; //used to maintain account + private final ConcurrentMap<Runnable,Integer> tasks; //used to maintain account //of the tasks for which //the job is responsible // sync on tasks. - static final Logger logger = TxnManagerImpl.participantLogger; /** * Create the <code>Job</code> object giving it the - * <code>TaskManager</code> responsible for the pool of + * <code>ExecutorService</code> responsible for the pool of * threads which perform the necessary work. * - * @param pool the <code>TaskManager</code> which provides the threads + * @param pool the <code>ExecutorService</code> which provides the threads */ - public Job(TaskManager pool, WakeupManager wm) { + Job(ExecutorService pool, WakeupManager wm) { this.wm = wm; this.pool = pool; pend = new AtomicInteger(-1); results = new ConcurrentHashMap<Integer,Object>(); - tasks = new ConcurrentHashMap<Object,Integer>(); + tasks = new ConcurrentHashMap<Runnable,Integer>(); } @@ -101,7 +102,7 @@ abstract class Job { /** - * Given a <code>TaskManager.Task</code>, this method + * Given a <code>Runnable</code>, this method * returns the current number of attempts it has made. * * @param who The task for which the number of attempts @@ -140,35 +141,36 @@ abstract class Job { /** * Schedules tasks for execution */ - public void scheduleTasks() { + public final void scheduleTasks() { Runnable[] tmp = createTasks(); - if (tmp != null) { - int length = tmp.length; - if (logger.isLoggable(Level.FINEST)) { - logger.log(Level.FINEST, - "Job:scheduleTasks with {0} tasks", - Integer.valueOf(length)); - } - - results.clear(); - attempts = new AtomicIntegerArray(length); - setPending(length); - - for (int i = 0; i < length; i++) { - - //Record the position if each - //task for later use when assembling - //the partial results - tasks.put(tmp[i],Integer.valueOf(i)); - attempts.set(i,0); - pool.add(tmp[i]); + synchronized (this){ + if (tmp != null) { + int length = tmp.length; if (logger.isLoggable(Level.FINEST)) { logger.log(Level.FINEST, - "Job:scheduleTasks added {0} to thread pool", - tmp[i]); + "Job:scheduleTasks with {0} tasks", + Integer.valueOf(length)); } - } + results.clear(); + tasks.clear(); + attempts = new AtomicIntegerArray(length); + setPending(length); + + for (int i = 0; i < length; i++) { + //Record the position if each + //task for later use when assembling + //the partial results + tasks.put(tmp[i],Integer.valueOf(i)); + attempts.set(i,0); + pool.submit(tmp[i], (Object) null); + if (logger.isLoggable(Level.FINEST)) { + logger.log(Level.FINEST, + "Job:scheduleTasks added {0} to thread pool", + tmp[i]); + } + } + } } } @@ -222,9 +224,7 @@ abstract class Job { "Job:setPending notifying, pending = {0}", Integer.valueOf(pend.get())); } - synchronized (this){ - notifyAll(); - } + notifyAll(); } } @@ -245,11 +245,11 @@ abstract class Job { /** - * Returns a reference to the <code>TaskManager</code> which + * Returns a reference to the <code>ExecutorService</code> which * supplies the threads used to executed tasks created by * this <code>Job</code> */ - protected TaskManager getPool() { + protected ExecutorService getPool() { return pool; } @@ -324,20 +324,16 @@ abstract class Job { * Halt all of the work being performed by * the <code>Job</code> */ - public void stop() { - Set s = tasks.keySet(); - Object[] vals = s.toArray(); - tasks.clear(); - - //Remove and interrupt all tasks - int l = vals.length; - for (int i = 0; i < l; i++) { - Runnable t = (Runnable) vals[i]; - pool.remove(t); - } - + public final synchronized void stop() { + Set<Runnable> s = tasks.keySet(); + Iterator<Runnable> it = s.iterator(); + while (it.hasNext()){ + Runnable r = it.next(); + if (r instanceof Future) ((Future)r).cancel(false); + } //Erase record of tasks, results and the //counting mechanism + tasks.clear(); setPending(-1); results.clear(); } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/ParticipantTask.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/ParticipantTask.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/ParticipantTask.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/ParticipantTask.java Fri Jan 3 12:01:35 2014 @@ -18,9 +18,9 @@ package com.sun.jini.mahalo; import com.sun.jini.thread.RetryTask; -import com.sun.jini.thread.TaskManager; import com.sun.jini.thread.WakeupManager; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; import net.jini.core.transaction.server.TransactionParticipant; @@ -32,7 +32,6 @@ import net.jini.core.transaction.server. * @author Sun Microsystems, Inc. * * @see TransactionParticipant - * @see TaskManager */ class ParticipantTask extends RetryTask { final ParticipantHandle handle; @@ -52,7 +51,7 @@ class ParticipantTask extends RetryTask * <code>TransactionParticipant</code> with which * this task interacts. */ - public ParticipantTask(TaskManager manager, WakeupManager wm, + public ParticipantTask(ExecutorService manager, WakeupManager wm, Job myjob, ParticipantHandle handle) { super(manager, wm); this.myjob = myjob; Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareAndCommitJob.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareAndCommitJob.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareAndCommitJob.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareAndCommitJob.java Fri Jan 3 12:01:35 2014 @@ -22,6 +22,7 @@ import com.sun.jini.mahalo.log.ClientLog import com.sun.jini.thread.TaskManager; import com.sun.jini.thread.WakeupManager; import java.rmi.RemoteException; +import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; import net.jini.core.transaction.Transaction; @@ -90,7 +91,7 @@ class PrepareAndCommitJob extends Job im * @see com.sun.jini.mahalo.log.ClientLog * @see net.jini.core.transaction.server.TransactionParticipant */ - public PrepareAndCommitJob(Transaction tr, TaskManager pool, + public PrepareAndCommitJob(Transaction tr, ExecutorService pool, WakeupManager wm, ClientLog log, ParticipantHandle handle) { super(pool, wm); Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareJob.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareJob.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareJob.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/PrepareJob.java Fri Jan 3 12:01:35 2014 @@ -23,6 +23,7 @@ import com.sun.jini.thread.TaskManager; import com.sun.jini.thread.WakeupManager; import java.rmi.RemoteException; import java.util.Iterator; +import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; import net.jini.core.transaction.Transaction; @@ -77,7 +78,7 @@ class PrepareJob extends Job implements * @see com.sun.jini.mahalo.log.ClientLog * @see net.jini.core.transaction.server.TransactionParticipant */ - public PrepareJob(Transaction tr, TaskManager pool, + public PrepareJob(Transaction tr, ExecutorService pool, WakeupManager wm, ClientLog log, ParticipantHandle[] handles) { super(pool, wm); Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/SettlerTask.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/SettlerTask.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/SettlerTask.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/SettlerTask.java Fri Jan 3 12:01:35 2014 @@ -23,6 +23,7 @@ import com.sun.jini.thread.TaskManager; import com.sun.jini.thread.WakeupManager; import java.rmi.NoSuchObjectException; import java.rmi.RemoteException; +import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; import net.jini.core.transaction.TransactionException; @@ -37,7 +38,7 @@ import net.jini.core.transaction.server. * */ -public class SettlerTask extends RetryTask implements TransactionConstants { +class SettlerTask extends RetryTask implements TransactionConstants { private final long tid; private final TransactionManager txnmgr; @@ -60,7 +61,7 @@ public class SettlerTask extends RetryTa * * @param tid transaction ID */ - public SettlerTask(TaskManager manager, WakeupManager wm, + SettlerTask(ExecutorService manager, WakeupManager wm, TransactionManager txnmgr, long tid) { super(manager, wm); Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImpl.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImpl.java Fri Jan 3 12:01:35 2014 @@ -63,8 +63,13 @@ import java.util.Iterator; import java.util.Map; import java.util.Queue; import java.util.Vector; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -109,6 +114,8 @@ import net.jini.security.Security; import net.jini.security.SecurityContext; import net.jini.security.proxytrust.ServerProxyTrust; import net.jini.security.TrustVerifier; +import org.apache.river.impl.thread.ExtensibleExecutorService; +import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory; /** * An implementation of the Jini Transaction Specification. @@ -159,10 +166,10 @@ class TxnManagerImpl /*extends RemoteSer /* TxnManagerTransaction objects. Tasks on a given */ /* TaskManager which create Tasks cannot be on the */ /* same TaskManager as their child Tasks. */ - private final TaskManager settlerpool; + private final ExecutorService settlerpool; /** wakeup manager for <code>SettlerTask</code> */ private final WakeupManager settlerWakeupMgr; - private final TaskManager taskpool; + private final ExecutorService taskpool; /** wakeup manager for <code>ParticipantTask</code> */ private final WakeupManager taskWakeupMgr; /* Map of transaction ids are their associated, internal @@ -362,9 +369,17 @@ class TxnManagerImpl /*extends RemoteSer txnLeasePeriodPolicy = init.txnLeasePeriodPolicy; persistenceDirectory = init.persistenceDirectory; joinStateManager = init.joinStateManager; - settlerpool = init.settlerpool; + settlerpool + = new ExtensibleExecutorService( + init.settlerpool, + new FutureFactory() + ); settlerWakeupMgr = init.settlerWakeupMgr; - taskpool = init.taskpool; + taskpool + = new ExtensibleExecutorService( + init.taskpool, + new FutureFactory() + ); taskWakeupMgr = init.taskWakeupMgr; topUuid = init.topUuid; context = init.context; @@ -911,7 +926,7 @@ class TxnManagerImpl /*extends RemoteSer SettlerTask task = new SettlerTask(settlerpool, settlerWakeupMgr, this, tid); - settlerpool.add(task); + settlerpool.execute(task); if (settleThread.hasBeenInterrupted()) throw new InterruptedException("settleTxns interrupted"); @@ -1379,14 +1394,14 @@ class TxnManagerImpl /*extends RemoteSer if(destroyLogger.isLoggable(Level.FINEST)) { destroyLogger.log(Level.FINEST,"Terminating settlerpool."); } - settlerpool.terminate(); + settlerpool.shutdown(); settlerWakeupMgr.stop(); settlerWakeupMgr.cancelAll(); if(destroyLogger.isLoggable(Level.FINEST)) { destroyLogger.log(Level.FINEST,"Terminating taskpool."); } - taskpool.terminate(); + taskpool.shutdown(); taskWakeupMgr.stop(); taskWakeupMgr.cancelAll(); @@ -1678,7 +1693,21 @@ class TxnManagerImpl /*extends RemoteSer initLogger.log(Level.FINEST, "Terminating settlerpool."); } try { - settlerpool.terminate(); + settlerpool.shutdown(); + try { + if (!settlerpool.awaitTermination(1, TimeUnit.MINUTES)){ + if(initLogger.isLoggable(Level.FINEST)) { + initLogger.log(Level.FINEST, + "Termination of settlerpool timed out after 1 minute."); + } + }; + } catch (InterruptedException e){ + Thread.currentThread().interrupt();// restore interrupt. + if(initLogger.isLoggable(Level.FINEST)) { + initLogger.log(Level.FINEST, + "Thread interrupted while waiting for orderly settlerPool termination."); + } + } if (settlerWakeupMgr != null) { if(initLogger.isLoggable(Level.FINEST)) { initLogger.log(Level.FINEST, @@ -1700,7 +1729,7 @@ class TxnManagerImpl /*extends RemoteSer initLogger.log(Level.FINEST,"Terminating taskpool."); } try { - taskpool.terminate(); + taskpool.shutdown(); if (taskWakeupMgr != null) { if(initLogger.isLoggable(Level.FINEST)) { initLogger.log(Level.FINEST, @@ -1821,4 +1850,20 @@ class TxnManagerImpl /*extends RemoteSer // Extract the txn id from the lower bits of the uuid return Long.valueOf(uuid.getLeastSignificantBits()); } + + private static class FutureFactory implements RunnableFutureFactory{ + + @Override + public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) { + if (r instanceof RunnableFuture) return (RunnableFuture<T>) r; + return new FutureTask<T>(r, value); + } + + @Override + public <T> RunnableFuture<T> newTaskFor(Callable<T> c) { + if (c instanceof RunnableFuture) return (RunnableFuture<T>) c; + return new FutureTask<T>(c); + } + + } } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java Fri Jan 3 12:01:35 2014 @@ -40,6 +40,10 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import net.jini.activation.ActivationExporter; import net.jini.config.Configuration; @@ -53,6 +57,7 @@ import net.jini.jeri.BasicJeriExporter; import net.jini.jeri.tcp.TcpServerEndpoint; import net.jini.security.BasicProxyPreparer; import net.jini.security.ProxyPreparer; +import org.apache.river.impl.thread.NamedThreadFactory; /** * @@ -75,9 +80,9 @@ class TxnManagerImplInitializer { LeasePeriodPolicy txnLeasePeriodPolicy = null; String persistenceDirectory = null; JoinStateManager joinStateManager = null; - TaskManager settlerpool = null; + ExecutorService settlerpool = null; WakeupManager settlerWakeupMgr = null; - TaskManager taskpool = null; + ExecutorService taskpool = null; WakeupManager taskWakeupMgr = null; Uuid topUuid = null; AccessControlContext context = null; @@ -197,8 +202,34 @@ class TxnManagerImplInitializer { // Used by log recovery logic settlerWakeupMgr = new WakeupManager(new WakeupManager.ThreadDesc(null, true)); taskWakeupMgr = new WakeupManager(new WakeupManager.ThreadDesc(null, true)); - settlerpool = (TaskManager) Config.getNonNullEntry(config, TxnManager.MAHALO, "settlerPool", TaskManager.class, new TaskManager(settlerthreads, settlertimeout, settlerload)); - taskpool = (TaskManager) Config.getNonNullEntry(config, TxnManager.MAHALO, "taskPool", TaskManager.class, new TaskManager(taskthreads, tasktimeout, taskload)); + settlerpool = Config.getNonNullEntry( + config, + TxnManager.MAHALO, + "settlerPool", + ExecutorService.class, + new ThreadPoolExecutor( + 1, + settlerthreads, + settlertimeout, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("TxnMgr settlerPool", false) + ) + ); + taskpool = Config.getNonNullEntry( + config, + TxnManager.MAHALO, + "taskPool", + ExecutorService.class, + new ThreadPoolExecutor( + 1, + taskthreads, + tasktimeout, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("TxnMgr taskPool", false) + ) + ); if (TxnManagerImpl.initLogger.isLoggable(Level.FINEST)) { TxnManagerImpl.initLogger.log(Level.FINEST, "Recovering state"); } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerTransaction.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerTransaction.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerTransaction.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerTransaction.java Fri Jan 3 12:01:35 2014 @@ -36,6 +36,7 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import java.util.Vector; +import java.util.concurrent.ExecutorService; import net.jini.core.lease.UnknownLeaseException; import net.jini.core.transaction.CannotAbortException; @@ -71,8 +72,18 @@ import net.jini.security.ProxyPreparer; class TxnManagerTransaction implements TransactionConstants, TimeConstants, LeasedResource { - static final long serialVersionUID = -2088463193687796098L; - + /* Serialization was removed to address Sun Bug ID:4912745 + * The bug has since been removed from the sun database by Oracle. + * + * While this class implemented Serializable in Jini 2.0 some of its + * instance variables were not serializable. + * + * Serialization was required for Replication, perhaps Distributed would + * be more suited to replication since it doesn't share the limitations + * of serialization. + * + * static final long serialVersionUID = -2088463193687796098L; + */ /* * Table of valid state transitions which a * TransactionManager may make. @@ -180,7 +191,7 @@ class TxnManagerTransaction * * @serial */ - private final TaskManager threadpool; + private final ExecutorService threadpool; /** * @serial @@ -265,7 +276,7 @@ class TxnManagerTransaction * unsettled. */ TxnManagerTransaction(TransactionManager mgr, LogManager logmgr, long id, - TaskManager threadpool, WakeupManager wm, TxnSettler settler, + ExecutorService threadpool, WakeupManager wm, TxnSettler settler, Uuid uuid) { if (logmgr == null) Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java Fri Jan 3 12:01:35 2014 @@ -2061,7 +2061,7 @@ class MailboxImpl implements MailboxBack boolean exists = pendingReg.remove(regID); NotifyTask task = (NotifyTask)activeReg.remove(regID); if (task != null) { // cancel active task, if any - task.cancel(); + task.cancel(false); if(deliveryLogger.isLoggable(Level.FINEST)) { deliveryLogger.log(Level.FINEST, "Cancelling active notification task for {0}", regID); @@ -2488,7 +2488,7 @@ class MailboxImpl implements MailboxBack boolean inPending = pendingReg.remove(uuid); NotifyTask task = (NotifyTask)activeReg.remove(uuid); if (task != null) { // cancel active task, if any - task.cancel(); + task.cancel(false); if(deliveryLogger.isLoggable(Level.FINEST)) { deliveryLogger.log(Level.FINEST, "Cancelling active notification task for {0}", uuid); Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImplInit.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImplInit.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImplInit.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImplInit.java Fri Jan 3 12:01:35 2014 @@ -332,7 +332,7 @@ class MailboxImplInit { MailboxImpl.NotifyTask task = activeReg.remove(uuid); if (task != null) { // cancel active task, if any - task.cancel(); + task.cancel(false); if (MailboxImpl.deliveryLogger.isLoggable(Level.FINEST)) { MailboxImpl.deliveryLogger.log(Level.FINEST, "Cancelling active notification task for {0}", uuid); } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java Fri Jan 3 12:01:35 2014 @@ -478,8 +478,7 @@ class Txn implements TransactableMgr, Tr */ private void cleanup() { if (monitorTask != null) - monitorTask.cancel(); // stop doing this - } + monitorTask.cancel(false); } // ----------------------------------- // Methods required by StorableObject Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java Fri Jan 3 12:01:35 2014 @@ -62,10 +62,18 @@ import com.sun.jini.constants.TimeConsta * @see WakeupManager */ import com.sun.jini.thread.WakeupManager.Ticket; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -public abstract class RetryTask implements Runnable, TimeConstants { +/** + * + * @param <V> + */ +public abstract class RetryTask<V> implements RunnableFuture<V>, TimeConstants { private final TaskManager manager; // the TaskManager for this task private final ExecutorService executor; private volatile RetryTime retry; // the retry object for this task @@ -226,19 +234,22 @@ public abstract class RetryTask implemen * unless a subclass overrides this to do so. Any override of this * method should invoke <code>super.cancel()</code>. */ - public void cancel() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { cancelled = true; Ticket ticket = this.ticket; if (ticket != null) wakeup.cancel(ticket); synchronized (this) { notifyAll(); // see waitFor() } + return true; } /** * Return <code>true</code> if <code>cancel</code> has been invoked. */ - public boolean cancelled() { + @Override + public boolean isCancelled() { return cancelled; } @@ -246,25 +257,41 @@ public abstract class RetryTask implemen * Return <code>true</code> if <code>tryOnce</code> has returned * successfully. */ - public boolean complete() { + @Override + public boolean isDone() { return complete; } - public boolean waitFor() throws InterruptedException { + public boolean waitFor(long duration) throws InterruptedException { while (!cancelled && !complete) synchronized (this){ - wait(); + if (duration == 0 )wait(); + else wait(duration); } return complete; } + + @Override + public V get() throws InterruptedException, ExecutionException { + waitFor(0L); + return null; + } + + @Override + public V get(long time, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException + { + waitFor(unit.toMillis(time)); + return null; + } /** * Reset values for a new use of this task. */ public final void reset() { - cancel(); // remove from the wakeup queue + cancel(false); // remove from the wakeup queue startTime = System.currentTimeMillis(); cancelled = false; complete = false; Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java Fri Jan 3 12:01:35 2014 @@ -41,12 +41,15 @@ import java.util.Map; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -62,6 +65,8 @@ import net.jini.core.discovery.LookupLoc import net.jini.core.lookup.ServiceRegistrar; import net.jini.security.BasicProxyPreparer; import net.jini.security.ProxyPreparer; +import org.apache.river.impl.thread.ExtensibleExecutorService; +import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory; import org.apache.river.impl.thread.NamedThreadFactory; /** @@ -109,7 +114,6 @@ abstract class AbstractLookupLocatorDisc * <code>ExecutorService</code>. */ private final ExecutorService discoveryExecutor; - private final Set<RetryTask> discoveryTasks; /** Wakeup manager for the discovery tasks. For any locator, after * an initial failure to discover the locator, the task used to * perform all future discovery attempts is managed by this @@ -332,9 +336,6 @@ abstract class AbstractLookupLocatorDisc DiscoveryTask task = new DiscoveryTask(LocatorReg.this, discoveryExecutor, discoveryWakeupMgr); discoveryExecutor.submit(task); - synchronized (discoveryTasks){ - discoveryTasks.add(task); - } } } ); @@ -342,9 +343,6 @@ abstract class AbstractLookupLocatorDisc DiscoveryTask task = new DiscoveryTask(this, discoveryExecutor, discoveryWakeupMgr); discoveryExecutor.submit(task); - synchronized (discoveryTasks){ - discoveryTasks.add(task); - } } } @@ -586,11 +584,28 @@ abstract class AbstractLookupLocatorDisc private AbstractLookupLocatorDiscovery(Initializer init){ registrarPreparer = init.registrarPreparer; - discoveryExecutor = init.discoveryTaskMgr; - discoveryTasks = Collections.newSetFromMap(new WeakHashMap<RetryTask,Boolean>()); + discoveryExecutor = new ExtensibleExecutorService( + init.discoveryTaskMgr, + new FutureFactory() + ); discoveryWakeupMgr = init.discoveryWakeupMgr; initialUnicastDelayRange = init.initialUnicastDelayRange.longValue(); } + + private static class FutureFactory implements RunnableFutureFactory { + + @Override + public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) { + if (r instanceof RunnableFuture) return (RunnableFuture<T>) r; + return new FutureTask<T>(r, value); + } + + @Override + public <T> RunnableFuture<T> newTaskFor(Callable<T> c) { + return new FutureTask<T>(c); + } + + } /** * Add a DiscoveryListener to the listener set. The listener's @@ -1178,22 +1193,12 @@ abstract class AbstractLookupLocatorDisc }//endif /* Cancel/remove pending tasks from the task manager and terminate */ if(discoveryExecutor != null) { - synchronized (discoveryTasks){ - Iterator<RetryTask> it = discoveryTasks.iterator(); - while (it.hasNext()){ - it.next().cancel();//cancel wakeup ticket - } - } List<Runnable> pendingTasks = discoveryExecutor.shutdownNow(); Iterator<Runnable> ir = pendingTasks.iterator(); while (ir.hasNext()){ Runnable pendingTask = ir.next(); - if (pendingTask instanceof RetryTask){ - ((RetryTask) pendingTask).cancel();//cancel wakeup ticket - System.err.println("Cancelled RetryTask"); - } else if (pendingTask instanceof Future) { - ((Future) pendingTask).cancel(true); - System.err.println("Task not instanceof RetryTask: " + pendingTask); + if (pendingTask instanceof Future) { + ((Future) pendingTask).cancel(false); } } }//endif Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Fri Jan 3 12:01:35 2014 @@ -2648,7 +2648,7 @@ public class JoinManager { synchronized(proxyReg.taskList) { if(proxyReg.proxyRegTask != null) { taskMgr.remove(proxyReg.proxyRegTask); - proxyReg.proxyRegTask.cancel();//cancel retry in WakeupMgr + proxyReg.proxyRegTask.cancel(false); proxyReg.proxyRegTask = null; //don't reuse because of seq# }//endif proxyReg.taskList.clear(); @@ -2671,7 +2671,7 @@ public class JoinManager { ArrayList pendingTasks = taskMgr.getPending(); for(int i=0;i<pendingTasks.size();i++) { RetryTask pendingTask = (RetryTask)pendingTasks.get(i); - pendingTask.cancel();//cancel wakeup ticket + pendingTask.cancel(false); taskMgr.remove(pendingTask);//remove from task mgr }//end loop /* Interrupt all active tasks, prepare taskMgr for GC. */ Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java?rev=1555061&r1=1555060&r2=1555061&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java Fri Jan 3 12:01:35 2014 @@ -118,15 +118,15 @@ public class RFC3986URLClassLoader exten registerAsParallelCapable.setAccessible(true); registerAsParallelCapable.invoke(null, new Object [0]); } catch (NoSuchMethodException ex) { - logger.log(Level.INFO, "Platform doesn't support parallel class loading", ex); + logger.log(Level.FINEST, "Platform doesn't support parallel class loading", ex); } catch (SecurityException ex) { - logger.log(Level.INFO, "Insufficient permission to enable parallel class loading, disabled", ex); + logger.log(Level.FINEST, "Insufficient permission to enable parallel class loading, disabled", ex); } catch (IllegalAccessException ex) { - logger.log(Level.INFO, "Unable to invoke parallel class loading", ex); + logger.log(Level.FINEST, "Unable to invoke parallel class loading", ex); } catch (IllegalArgumentException ex) { - logger.log(Level.INFO, "Unable to invoke parallel class loading", ex); + logger.log(Level.FINEST, "Unable to invoke parallel class loading", ex); } catch (InvocationTargetException ex) { - logger.log(Level.INFO, "Unable to invoke parallel class loading", ex); + logger.log(Level.FINEST, "Unable to invoke parallel class loading", ex); } String codebaseAnnotationProperty = null; String prop = AccessController.doPrivileged( Added: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ExtensibleExecutorService.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ExtensibleExecutorService.java?rev=1555061&view=auto ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ExtensibleExecutorService.java (added) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/ExtensibleExecutorService.java Fri Jan 3 12:01:35 2014 @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.river.impl.thread; + +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + +/** + * AbstractExecutorService introduced two protected methods in Java 1.6 to + * allow an ExecutorService to use customised RunnableFuture implementations + * other than the default FutureTask. + * + * This class requires a Factory to create the RunnableFuture, encapsulating + * any existing ExecutorService. + * + * This allows an ExecutorService to be provided by Configuration or a pool + * to be shared without requiring that all implementations also share the same + * type of RunnableFuture<T>. + * + * @author Peter Firmstone + */ +public class ExtensibleExecutorService extends AbstractExecutorService { + + private final ExecutorService executor; + private final RunnableFutureFactory factory; + + public ExtensibleExecutorService(ExecutorService executor, RunnableFutureFactory factory){ + this.executor = executor; + this.factory = factory; + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Runnable r, T value){ + return factory.newTaskFor(r, value); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(Callable<T> c){ + return factory.newTaskFor(c); + } + + @Override + public void shutdown() { + executor.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return executor.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public void execute(Runnable command) { + executor.execute(command); + } + + /** + * Factory for creating custom RunnableFuture implementations. + */ + public interface RunnableFutureFactory { + /** + * Returns a <tt>RunnableFuture</tt> for the given runnable and default + * value. + * + * @param runnable the runnable task being wrapped + * @param value the default value for the returned future + * @return a <tt>RunnableFuture</tt> which when run will run the + * underlying runnable and which, as a <tt>Future</tt>, will yield + * the given value as its result and provide for cancellation of + * the underlying task. + */ + public <T> RunnableFuture<T> newTaskFor(Runnable r, T value); + /** + * Returns a <tt>RunnableFuture</tt> for the given callable task. + * + * @param callable the callable task being wrapped + * @return a <tt>RunnableFuture</tt> which when run will call the + * underlying callable and which, as a <tt>Future</tt>, will yield + * the callable's result as its result and provide for + * cancellation of the underlying task. + */ + public <T> RunnableFuture<T> newTaskFor(Callable<T> c); + } + +}
