Author: peter_firmstone Date: Sat Jan 11 21:23:28 2014 New Revision: 1557469
URL: http://svn.apache.org/r1557469 Log: JoinManager - replace TaskManager with ExecutorService Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1557469&r1=1557468&r2=1557469&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Sat Jan 11 21:23:28 2014 @@ -20,10 +20,8 @@ package net.jini.lookup; import com.sun.jini.constants.ThrowableConstants; import com.sun.jini.lookup.entry.LookupAttributes; import com.sun.jini.thread.RetryTask; -import com.sun.jini.thread.TaskManager; import com.sun.jini.thread.WakeupManager; import com.sun.jini.logging.LogUtil; -import com.sun.jini.thread.TaskManager.Task; import net.jini.config.Configuration; import net.jini.config.ConfigurationException; @@ -50,13 +48,24 @@ import net.jini.core.lookup.ServiceRegis import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.river.api.util.FutureObserver; +import org.apache.river.impl.thread.ExtensibleExecutorService; +import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory; +import org.apache.river.impl.thread.NamedThreadFactory; /** * A goal of any well-behaved service is to advertise the facilities and @@ -441,6 +450,23 @@ import java.util.logging.Logger; * @see java.util.logging.Logger */ public class JoinManager { + + /** + * executorService requires a PriorityBlockingQueue, this is the + * comparator for that queue. + */ + public static class ExecutorQueueComparator implements Comparator { + + @Override + public int compare(Object o1, Object o2) { + int one = ((ProxyRegTask)o1).getSeqN(); + int two = ((ProxyRegTask)o2).getSeqN(); + if (one < two) return -1; + if (one > two) return 1; + return 0; + } + + } /** Implementation Note: * @@ -564,7 +590,7 @@ public class JoinManager { */ /** Abstract base class from which all of the task classes are derived. */ - private class ProxyRegTask extends RetryTask implements Task { + private class ProxyRegTask extends RetryTask { private final long[] sleepTime = { 5*1000, 10*1000, 15*1000, 20*1000, 25*1000, 30*1000 }; // volatile fields only mutated while synchronized on proxyReg.taskList @@ -679,22 +705,22 @@ public class JoinManager { * @param tasks the tasks with which to compare the current task * @param size elements with index less than size are considered */ - public boolean runAfter(List tasks, int size) { - /* If the service's ID has already been set, then it's okay - * to run all ProxyRegTask's in parallel, otherwise, the - * ProxyRegTask with the lowest sequence number should be run. - */ - if(serviceItem.serviceID != null) return false; - /* For task with lowest seq #, run it now; else run it later */ - for(int i=0; i<size; i++) { - Object t = tasks.get(i); - if (t instanceof ProxyRegTask){ - int nextTaskSeqN = ((ProxyRegTask)t).getSeqN(); - if( seqN > nextTaskSeqN ) return true; - } - }//end loop - return false; - }//end runAfter +// public boolean runAfter(List tasks, int size) { +// /* If the service's ID has already been set, then it's okay +// * to run all ProxyRegTask's in parallel, otherwise, the +// * ProxyRegTask with the lowest sequence number should be run. +// */ +// if(serviceItem.serviceID != null) return false; +// /* For task with lowest seq #, run it now; else run it later */ +// for(int i=0; i<size; i++) { +// Object t = tasks.get(i); +// if (t instanceof ProxyRegTask){ +// int nextTaskSeqN = ((ProxyRegTask)t).getSeqN(); +// if( seqN > nextTaskSeqN ) return true; +// } +// }//end loop +// return false; +// }//end runAfter /** Accessor method that returns the instance of <code>ProxyReg</code> * (the lookup service) associated with the task represented by @@ -1019,7 +1045,8 @@ public class JoinManager { * service to discover, and with which this join manager's service * should be registered. */ - private class ProxyReg { + private class ProxyReg implements FutureObserver{ + /** Class that is registered as a listener with this join manager's * lease renewal manager. That lease renewal manager manages the * lease granted to this join manager's associated service by the @@ -1165,11 +1192,13 @@ public class JoinManager { /** The set of sub-tasks that are to be executed in order for the * lookup service associated with the current instance of this class. */ - final List<JoinTask> taskList = new ArrayList<JoinTask>(1); + final List<JoinTask> taskList = new ArrayList<JoinTask>(); /** The instance of <code>DiscLeaseListener</code> that is registered * with the lease renewal manager that handles the lease of this join * manger's service. */ + final List<Future> runningTasks = new ArrayList<Future>(); + private final DiscLeaseListener dListener = new DiscLeaseListener(); /** Constructor that associates this class with the lookup service @@ -1183,7 +1212,24 @@ public class JoinManager { if(proxy == null) throw new IllegalArgumentException ("proxy can't be null"); this.proxy = proxy; - }//end constructor + }//end constructor + + @Override + public void futureCompleted(Future e) { + synchronized (runningTasks){ + runningTasks.remove(e); + } + } + + public void terminate(){ + synchronized (runningTasks){ + Iterator<Future> it = runningTasks.iterator(); + while (it.hasNext()){ + it.next().cancel(false); + } + runningTasks.clear(); + } + } /** Convenience method that adds new sub-tasks to this class' * task queue. @@ -1192,13 +1238,18 @@ public class JoinManager { */ public void addTask(JoinTask task) { if(bTerminated) return; + Future future = null; synchronized(taskList) { taskList.add(task); if(this.proxyRegTask == null) { this.proxyRegTask = new ProxyRegTask(this,taskSeqN++); - taskMgr.add(this.proxyRegTask); + this.proxyRegTask.addObserver(this); + future = taskMgr.submit(this.proxyRegTask); }//endif }//end sync(taskList) + synchronized (runningTasks){ + runningTasks.add(future); + } }//end addTask /** Registers the service associated with this join manager with the @@ -1438,7 +1489,7 @@ public class JoinManager { * "backoff strategy") - the re-execution of each failed task in this * <code>TaskManager</code>. */ - private final TaskManager taskMgr; + private final ExecutorService taskMgr; /** Maximum number of times a failed task is allowed to be re-executed. */ private final int maxNRetries; /** Wakeup manager for the various tasks executed by this join manager. @@ -2426,7 +2477,7 @@ public class JoinManager { ProxyPreparer registrarPreparer; ProxyPreparer registrationPreparer; ProxyPreparer serviceLeasePreparer; - TaskManager taskManager; + ExecutorService taskManager; WakeupManager wakeupManager; Integer maxNretrys; LeaseRenewalManager leaseRenewalManager; @@ -2437,7 +2488,7 @@ public class JoinManager { Conf ( ProxyPreparer registrarPreparer, ProxyPreparer registrationPreparer, ProxyPreparer serviceLeasePreparer, - TaskManager taskManager, + ExecutorService taskManager, WakeupManager wakeupManager, Integer maxNretrys, LeaseRenewalManager leaseRenewalManager, @@ -2512,29 +2563,36 @@ public class JoinManager { /* Retrieve configuration items if applicable */ if(config == null) throw new NullPointerException("config is null"); /* Proxy preparers */ - ProxyPreparer registrarPreparer = (ProxyPreparer)config.getEntry + ProxyPreparer registrarPreparer = config.getEntry (COMPONENT_NAME, "registrarPreparer", ProxyPreparer.class, new BasicProxyPreparer()); - ProxyPreparer registrationPreparer = (ProxyPreparer)config.getEntry + ProxyPreparer registrationPreparer = config.getEntry (COMPONENT_NAME, "registrationPreparer", ProxyPreparer.class, new BasicProxyPreparer()); - ProxyPreparer serviceLeasePreparer = (ProxyPreparer)config.getEntry + ProxyPreparer serviceLeasePreparer = config.getEntry (COMPONENT_NAME, "serviceLeasePreparer", ProxyPreparer.class, new BasicProxyPreparer()); /* Task manager */ - TaskManager taskMgr; + ExecutorService taskMgr; try { taskMgr = config.getEntry(COMPONENT_NAME, - "taskManager", - TaskManager.class); + "executorService", + ExecutorService.class); } catch(NoSuchEntryException e) { /* use default */ - taskMgr = new TaskManager(MAX_N_TASKS,(15*1000),1.0f); + taskMgr = new ThreadPoolExecutor( + 1, + MAX_N_TASKS, + 15, + TimeUnit.SECONDS, + new PriorityBlockingQueue(100, new ExecutorQueueComparator()), + new NamedThreadFactory("JoinManager executor thread", false) + ); } /* Wakeup manager */ WakeupManager wakeupMgr; @@ -2547,7 +2605,7 @@ public class JoinManager { (new WakeupManager.ThreadDesc(null,true)); } /* Max number of times to re-schedule tasks in thru wakeup manager */ - Integer maxNRetries = (config.getEntry + int maxNRetries = (config.getEntry (COMPONENT_NAME, "wakeupRetries", int.class, @@ -2555,7 +2613,7 @@ public class JoinManager { /* Lease renewal manager */ if(leaseMgr == null) { try { - leaseMgr = (LeaseRenewalManager)config.getEntry + leaseMgr = config.getEntry (COMPONENT_NAME, "leaseManager", LeaseRenewalManager.class); @@ -2563,7 +2621,7 @@ public class JoinManager { leaseMgr = new LeaseRenewalManager(config); } }//endif - Long renewalDuration = (config.getEntry + long renewalDuration = (config.getEntry (COMPONENT_NAME, "maxLeaseDuration", long.class, @@ -2606,7 +2664,22 @@ public class JoinManager { registrarPreparer = conf.registrarPreparer; registrationPreparer = conf.registrationPreparer; serviceLeasePreparer = conf.serviceLeasePreparer; - taskMgr = conf.taskManager; + taskMgr = new ExtensibleExecutorService(conf.taskManager, + new RunnableFutureFactory(){ + + @Override + public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) { + if (r instanceof ProxyRegTask) return (RunnableFuture<T>) r; + throw new IllegalStateException("Runnable not instance of ProxyRegTask"); + } + + @Override + public <T> RunnableFuture<T> newTaskFor(Callable<T> c) { + if (c instanceof ProxyRegTask) return (RunnableFuture<T>) c; + throw new IllegalStateException("Callable not instance of ProxyRegTask"); + } + + }); wakeupMgr = conf.wakeupManager; maxNRetries = conf.maxNretrys; leaseRenewalMgr = conf.leaseRenewalManager; @@ -2648,12 +2721,12 @@ public class JoinManager { if(taskMgr == null) return; synchronized(proxyReg.taskList) { if(proxyReg.proxyRegTask != null) { - taskMgr.remove(proxyReg.proxyRegTask); proxyReg.proxyRegTask.cancel(false); proxyReg.proxyRegTask = null; //don't reuse because of seq# }//endif proxyReg.taskList.clear(); }//end sync(proxyReg.taskList) + proxyReg.terminate(); }//end removeTasks /** Removes from the task manager, all pending tasks regardless of the @@ -2667,22 +2740,8 @@ public class JoinManager { wakeupMgr.cancelAll();//cancel all tickets wakeupMgr.stop();//stop execution of the wakeup manager } - synchronized(taskMgr) { - /* Remove all pending tasks */ - ArrayList pendingTasks = taskMgr.getPending(); - for(int i=0;i<pendingTasks.size();i++) { - RetryTask pendingTask = (RetryTask)pendingTasks.get(i); - pendingTask.cancel(false); - taskMgr.remove(pendingTask);//remove from task mgr - }//end loop - /* Interrupt all active tasks, prepare taskMgr for GC. */ - taskMgr.terminate(); - } - // Too lazy to put out the trash. -// taskMgr = null; -// }//end sync(taskMgr) -// wakeupMgr = null; -// }//end sync(wakeupMgr) + /* Interrupt all active tasks, prepare taskMgr for GC. */ + taskMgr.shutdownNow(); }//end terminateTaskMgr /** Examines the elements of the input set and, upon finding at least one
