Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java?rev=1554723&r1=1554722&r2=1554723&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java Thu Jan 2 02:45:07 2014 @@ -26,19 +26,29 @@ import com.sun.jini.discovery.internal.M import com.sun.jini.logging.Levels; import com.sun.jini.logging.LogUtil; import com.sun.jini.thread.RetryTask; -import com.sun.jini.thread.TaskManager; import com.sun.jini.thread.WakeupManager; import java.io.IOException; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.Socket; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import net.jini.config.Configuration; @@ -52,6 +62,7 @@ import net.jini.core.discovery.LookupLoc import net.jini.core.lookup.ServiceRegistrar; import net.jini.security.BasicProxyPreparer; import net.jini.security.ProxyPreparer; +import org.apache.river.impl.thread.NamedThreadFactory; /** * This package private superclass of LookupLocatorDiscovery exists for @@ -88,16 +99,17 @@ abstract class AbstractLookupLocatorDisc } } - /** Task manager for the discovery tasks. On the first attempt to + /** ExecutorService for the discovery tasks. On the first attempt to * discover each locator, the tasks used to perform those discoveries - * are managed by this <code>TaskManager</code> so that the number of + * are managed by this <code>ExecutorService</code> so that the number of * concurrent threads can be bounded. If one or more of those attempts * fails, a <code>WakeupManager</code> is used (through the use of a * <code>RetryTask</code>) to schedule - at a later time (employing a * "backoff strategy") - the re-execution of each failed task in this - * <code>TaskManager</code>. + * <code>ExecutorService</code>. */ - private final TaskManager discoveryTaskMgr; + private final ExecutorService discoveryExecutor; + private final Set<RetryTask> discoveryTasks; /** Wakeup manager for the discovery tasks. For any locator, after * an initial failure to discover the locator, the task used to * perform all future discovery attempts is managed by this @@ -111,17 +123,17 @@ abstract class AbstractLookupLocatorDisc */ private final WakeupManager discoveryWakeupMgr; /** Stores LookupLocators that have not been discovered yet. */ - private final HashSet undiscoveredLocators = new HashSet(11); // sync(this) + private final Set<LocatorReg> undiscoveredLocators = Collections.newSetFromMap(new ConcurrentHashMap<LocatorReg,Boolean>()); /** Stores LookupLocators that have been discovered */ - private final ArrayList discoveredLocators = new ArrayList(11); // sync(this) + private final List<LocatorReg> discoveredLocators = new CopyOnWriteArrayList<LocatorReg>(); /** Thread that handles pending notifications. */ - private Notifier notifierThread; // sync(pendingNotifies) + private final Notifier notifierThread = new Notifier(); /** Notifications to be sent to listeners. */ - private final LinkedList pendingNotifies = new LinkedList(); // sync(pendingNotifies) + private final BlockingDeque<NotifyTask> pendingNotifies = new LinkedBlockingDeque<NotifyTask>(); /** Stores DiscoveryListeners **/ - private final ArrayList listeners = new ArrayList(1); // sync(this) + private final List<DiscoveryListener> listeners = new CopyOnWriteArrayList<DiscoveryListener>(); // sync(this) only for add and remove to avoid duplicates. /** Flag indicating whether or not this class is still functional. */ - private boolean terminated = false; // sync(this) + private volatile boolean terminated = false; // sync(this) /* Preparer for the proxies to the lookup services that are discovered * and used by this utility. */ @@ -130,7 +142,7 @@ abstract class AbstractLookupLocatorDisc * protocol. * TODO: investigate why this field isn't used. */ - private Discovery protocol2 = Discovery.getProtocol2(null); + private final Discovery protocol2 = Discovery.getProtocol2(null); /* * Controls how long to wait before attempting unicast discovery, on * startup. @@ -140,7 +152,7 @@ abstract class AbstractLookupLocatorDisc * Flag which indicates if discoverLocators was called during * initialUnicastDelayRange delay. */ - private boolean discoverLocatorsCalled = false; // sync(this) + private volatile boolean discoverLocatorsCalled = false; /** Wrapper class in which each instance corresponds to a lookup service * to discover via unicast discovery. */ @@ -317,16 +329,22 @@ abstract class AbstractLookupLocatorDisc System.currentTimeMillis() + MIN_RETRY, new Runnable() { public void run() { - discoveryTaskMgr.add - (new DiscoveryTask(LocatorReg.this, - discoveryTaskMgr, discoveryWakeupMgr)); + DiscoveryTask task = new DiscoveryTask(LocatorReg.this, + discoveryExecutor, discoveryWakeupMgr); + discoveryExecutor.submit(task); + synchronized (discoveryTasks){ + discoveryTasks.add(task); + } } } ); } else { - discoveryTaskMgr.add - (new DiscoveryTask(this, - discoveryTaskMgr, discoveryWakeupMgr)); + DiscoveryTask task = new DiscoveryTask(this, + discoveryExecutor, discoveryWakeupMgr); + discoveryExecutor.submit(task); + synchronized (discoveryTasks){ + discoveryTasks.add(task); + } } } @@ -359,12 +377,12 @@ abstract class AbstractLookupLocatorDisc /** Data structure containing task data processed by the Notifier Thread */ private static class NotifyTask { /** The listeners to notify */ - public final ArrayList listeners; + public final List listeners; /** Map of discovered registrars to groups in which each is a member */ public final Map groupsMap; /** True if discarded, else discovered */ public final boolean discard; - public NotifyTask(ArrayList listeners, + public NotifyTask(List listeners, Map groupsMap, boolean discard) { @@ -381,23 +399,28 @@ abstract class AbstractLookupLocatorDisc * Only 1 instance of this thread is run. */ private class Notifier extends Thread { + // In case client code catches and resets interrupt. + private volatile boolean interrupted = false; /** Construct a daemon thread */ public Notifier() { super("event notifier"); setDaemon(true); }//end constructor + + public void interrupt(){ + super.interrupt(); + interrupted = true; + } public void run() { logger.finest("LookupLocatorDiscovery - Notifier thread started"); - while (true) { + while (!interrupted) { NotifyTask task; - synchronized (pendingNotifies) { - if (pendingNotifies.isEmpty()) { - notifierThread = null; - return; - }//endif - task = (NotifyTask)pendingNotifies.removeFirst(); - }//end sync(pendingNotifies) + try { + task = pendingNotifies.takeFirst(); // Blocks waiting for food. + } catch (InterruptedException ex) { + return; + } boolean firstListener = true; for(Iterator iter = task.listeners.iterator();iter.hasNext();){ DiscoveryListener l = (DiscoveryListener)iter.next(); @@ -409,23 +432,26 @@ abstract class AbstractLookupLocatorDisc String eType = (task.discard ? "discarded":"discovered"); ServiceRegistrar[] regs = e.getRegistrars(); - logger.finest(eType+" event -- "+regs.length - +" lookup(s)"); + int len = regs.length; + logger.log(Level.FINEST, "{0} event -- {1} lookup(s)", + new Object[]{eType, len}); Map groupsMap = e.getGroups(); - for(int i=0;i<regs.length;i++) { + for(int i=0;i<len;i++) { LookupLocator loc = null; try { loc = regs[i].getLocator(); } catch (Throwable ex) { /* ignore */ } String[] groups = (String[])groupsMap.get(regs[i]); - logger.finest(" "+eType+" locator = "+loc); + logger.log(Level.FINEST, " {0} locator = {1}", + new Object[]{eType, loc}); if(groups.length == 0) { - logger.finest(" "+eType - +" group = NO_GROUPS"); + logger.log(Level.FINEST, + " {0} group = NO_GROUPS", eType); } else { for(int j=0;j<groups.length;j++) { - logger.finest(" "+eType+" group["+j+"] " - +"= "+groups[j]); + logger.log(Level.FINEST, + " {0} group[{1}] = {2}", + new Object[]{eType, j, groups[j]}); }//end loop }//endif(groups.length) }//end loop @@ -459,10 +485,10 @@ abstract class AbstractLookupLocatorDisc private class DiscoveryTask extends RetryTask { private final LocatorReg reg; public DiscoveryTask(LocatorReg reg, - TaskManager taskMgr, + ExecutorService executor, WakeupManager wakeupMgr) { - super(taskMgr,wakeupMgr); + super(executor,wakeupMgr); this.reg = reg; }//end constructor @@ -479,45 +505,38 @@ abstract class AbstractLookupLocatorDisc */ public boolean tryOnce() { logger.finest("LookupLocatorDiscovery - DiscoveryTask started"); - synchronized(AbstractLookupLocatorDiscovery.this) { - if (terminated) { - return true; - } - /* Locators may have been removed (ex. removeLocators or - * setLocators) between the time they were added to the map, - * and the time this task is finally executed. Determine if - * this task should continue. - */ - if( undiscoveredLocators.isEmpty() ) { - logger.finest("LookupLocatorDiscovery - DiscoveryTask " - +"completed"); - return true;//true ==> done. Don't queue retry. - }//endif - if(!undiscoveredLocators.contains(reg)) { - logger.finest("LookupLocatorDiscovery - DiscoveryTask " - +"completed"); - return true;//already removed, true ==> don't queue retry - } - }//end sync(LookupLocatorDiscovery.this) + if (terminated) return true; + /* Locators may have been removed (ex. removeLocators or + * setLocators) between the time they were added to the map, + * and the time this task is finally executed. Determine if + * this task should continue. + */ + if( undiscoveredLocators.isEmpty() ) { + logger.finest("LookupLocatorDiscovery - DiscoveryTask " + +"completed"); + return true;//true ==> done. Don't queue retry. + }//endif + if(!undiscoveredLocators.contains(reg)) { + logger.finest("LookupLocatorDiscovery - DiscoveryTask " + +"completed"); + return true;//already removed, true ==> don't queue retry + } /* Use the unicast discovery protocol to perform the actual * discovery. Note that since this process involves remote, * interprocess (socket) communication, it is important that * this processing be performed outside of the sync block. */ boolean noRetry = regTryGetProxy(reg);//t -> done, f -> queue retry - synchronized (AbstractLookupLocatorDiscovery.this) { - if (terminated) { - return true; - } - if(noRetry) { - logger.finest("LookupLocatorDiscovery - DiscoveryTask " - +"completed"); - } else { - logger.finest("LookupLocatorDiscovery - DiscoveryTask " - +"failed, will retry later"); - }//endif - return noRetry; - } + if (terminated) return true; + + if(noRetry) { + logger.finest("LookupLocatorDiscovery - DiscoveryTask " + +"completed"); + } else { + logger.finest("LookupLocatorDiscovery - DiscoveryTask " + +"failed, will retry later"); + }//endif + return noRetry; }//end tryOnce @@ -531,14 +550,6 @@ abstract class AbstractLookupLocatorDisc return reg.getNextTryTime(); }//end retryTime - /** Returns true if current instance must be run after task(s) in - * task manager queue. - * @param tasks the tasks to consider. - * @param size elements with index less than size are considered. - */ - public boolean runAfter(java.util.List tasks, int size) { - return false; - }//end runAfter }//end class DiscoveryTask /** @@ -575,7 +586,8 @@ abstract class AbstractLookupLocatorDisc private AbstractLookupLocatorDiscovery(Initializer init){ registrarPreparer = init.registrarPreparer; - discoveryTaskMgr = init.discoveryTaskMgr; + discoveryExecutor = init.discoveryTaskMgr; + discoveryTasks = Collections.newSetFromMap(new WeakHashMap<RetryTask,Boolean>()); discoveryWakeupMgr = init.discoveryWakeupMgr; initialUnicastDelayRange = init.initialUnicastDelayRange.longValue(); } @@ -603,24 +615,21 @@ abstract class AbstractLookupLocatorDisc if(l == null) { throw new NullPointerException("can't add null listener"); } - synchronized(this) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - if(listeners.contains(l)) return; //already have this listener - listeners.add(l); - if(!discoveredLocators.isEmpty()) { - HashMap groupsMap = new HashMap(discoveredLocators.size()); - Iterator iter = discoveredLocators.iterator(); - for (int i = 0; iter.hasNext(); i++) { - LocatorReg reg = (LocatorReg)iter.next(); - groupsMap.put(reg.getProxy(), reg.getMemberGroups()); - }//end loop - ArrayList list = new ArrayList(1); - list.add(l); - addNotify(list, groupsMap, false); - }//endif - }//end sync + + if (terminated) throw new IllegalStateException("discovery terminated"); + synchronized (this){ // sync to make atomic with remove + if(listeners.contains(l)) return; //already have this listener + listeners.add(l); // Small window where it's possible to add duplicates. + } + Map groupsMap = new HashMap(discoveredLocators.size()); + Iterator<LocatorReg> iter = discoveredLocators.iterator(); + while (iter.hasNext()) { + LocatorReg reg = iter.next(); + groupsMap.put(reg.getProxy(), reg.getMemberGroups()); + }//end loop + List list = new ArrayList(1); + list.add(l); + if (!groupsMap.isEmpty()) addNotify(list, groupsMap, false); }//end addDiscoveryListener /** @@ -636,12 +645,11 @@ abstract class AbstractLookupLocatorDisc * * @see #addDiscoveryListener */ - public synchronized void removeDiscoveryListener(DiscoveryListener l) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); + public void removeDiscoveryListener(DiscoveryListener l) { + if (terminated) throw new IllegalStateException("discovery terminated"); + synchronized (this){ // sync to avoid duplicates, makes add atomic + listeners.remove(l); } - int index = listeners.indexOf(l); - if(index != -1) listeners.remove(index); }//end removeDiscoveryListener /** @@ -661,15 +669,9 @@ abstract class AbstractLookupLocatorDisc * @see net.jini.discovery.DiscoveryManagement#removeDiscoveryListener */ public ServiceRegistrar[] getRegistrars() { - synchronized(this) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - if((discoveredLocators == null) || (discoveredLocators.isEmpty())){ - return new ServiceRegistrar[0]; - } - return buildServiceRegistrar(); - }//end sync(this) + if (terminated) throw new IllegalStateException("discovery terminated"); + if(discoveredLocators.isEmpty()) return new ServiceRegistrar[0]; + return buildServiceRegistrar(); }//end getRegistrars /** @@ -691,17 +693,17 @@ abstract class AbstractLookupLocatorDisc * @see net.jini.discovery.DiscoveryManagement#discard */ public void discard(ServiceRegistrar proxy) { - synchronized(this) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - if(proxy == null) return; - LookupLocator lct = findRegFromProxy(proxy); - if(lct == null) return; - /* Remove locator from the set of already-discovered locators */ - LocatorReg reg = removeDiscoveredLocator(lct); + if (terminated) { + throw new IllegalStateException("discovery terminated"); + } + if(proxy == null) return; + LookupLocator lct = findRegFromProxy(proxy); + if(lct == null) return; + /* Remove locator from the set of already-discovered locators */ + LocatorReg reg = removeDiscoveredLocator(lct); + if (reg != null){ /* Prepare the information for the discarded event */ - HashMap groupsMap = new HashMap(1); + Map groupsMap = new HashMap(1); groupsMap.put(reg.getProxy(), reg.getMemberGroups()); /* Prepare the discarded locatorReg for re-discovery */ synchronized (reg){ @@ -709,12 +711,12 @@ abstract class AbstractLookupLocatorDisc reg.memberGroups = null; reg.delayNextTryTime(); } - addToMap(reg);//put discarded reg back in the not-discovered map + addAndQueueDiscoveryTaskIfAbsent(reg);//put discarded reg back in the not-discovered map /* Send a discarded event to all registered listeners */ - if(!listeners.isEmpty()) { - addNotify((ArrayList)listeners.clone(), groupsMap, true); - }//endif - }//end sync(this) + if(!listeners.isEmpty()) { + addNotify(listeners, groupsMap, true); + }//endif + } }//end discard /** @@ -727,13 +729,23 @@ abstract class AbstractLookupLocatorDisc * * @see net.jini.discovery.DiscoveryManagement#terminate */ - public synchronized void terminate() { - if(terminated) return; - terminated = true; + public void terminate() { + synchronized (this){ + if(terminated) return; + terminated = true; + } terminateTaskMgr(); - synchronized(pendingNotifies) { - pendingNotifies.clear(); - }//end sync + boolean interrupted = false; + // Don't leave DiscoveryListener's hanging. + while (!pendingNotifies.isEmpty()) { + try { + Thread.sleep(1000L); + } catch (InterruptedException ex) { + interrupted = true; + } + } + notifierThread.interrupt(); + if (interrupted) Thread.currentThread().interrupt(); }//end terminate /** @@ -757,27 +769,23 @@ abstract class AbstractLookupLocatorDisc * @see net.jini.discovery.DiscoveryLocatorManagement#getLocators * @see #setLocators */ - public synchronized LookupLocator[] getLocators() { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } + public LookupLocator[] getLocators() { + if (terminated) throw new IllegalStateException("discovery terminated"); /* Includes the set of already-discovered lookup services and * the set of not-yet-discovered lookup services. */ - int size = discoveredLocators.size() + undiscoveredLocators.size(); - LookupLocator[] ret = new LookupLocator[size]; + List<LookupLocator> locators = new LinkedList<LookupLocator>(); /* Retrieve the locators of the already-discovered lookup services */ - int k = 0; Iterator iter = discoveredLocators.iterator(); while(iter.hasNext()) { - ret[k++] = ((LocatorReg)iter.next()).l; + locators.add(((LocatorReg)iter.next()).l); }//end loop /* Append the locators of the not-yet-discovered lookup services */ iter = undiscoveredLocators.iterator(); while(iter.hasNext()) { - ret[k++] = ((LocatorReg)iter.next()).l; + locators.add(((LocatorReg)iter.next()).l); }//end loop - return ret; + return locators.toArray(new LookupLocator[locators.size()]); }//end getLocators /** @@ -801,7 +809,7 @@ abstract class AbstractLookupLocatorDisc * @see net.jini.discovery.DiscoveryLocatorManagement#addLocators * @see #removeLocators */ - public synchronized void addLocators(LookupLocator[] locators) { + public void addLocators(LookupLocator[] locators) { testSetForNull(locators); if (terminated) { throw new IllegalStateException("discovery terminated"); @@ -841,61 +849,60 @@ abstract class AbstractLookupLocatorDisc */ public void setLocators(LookupLocator[] locators) { testSetForNull(locators); - synchronized(this) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - HashMap groupsMap = new HashMap(1); - /* From the set of already-discovered locators, remove each - * element that is NOT in the input set of locators. - */ - Iterator iter = discoveredLocators.iterator(); - while(iter.hasNext()) { - LocatorReg reg = (LocatorReg)iter.next(); - if(!isArrayContains(locators, reg.l)) { - iter.remove(); - groupsMap.put(reg.getProxy(), reg.getMemberGroups()); - }//endif - }//end loop - /* From the set of yet-to-be-discovered locators, remove each - * element that is NOT in the input set of replacement locators. - * - * Note that if the discovery task is currently attempting to - * discover a locator from this set, and if that locator is not - * contained in the given input set of replacement locators (that - * is, it is no longer desired that that locator be discovered), - * then the discovery task, when it completes (either successfully - * or un-successfully) the attempt to discover that locator, will - * end all discovery processing with respect to the affected - * locator. - * - * To inform the discovery task -- upon its return from the - * unicast discovery process -- of the desire to terminate all - * discovery processing for that particular locator, the element - * in the set of undiscoveredLocators that corresponds to that - * locator is removed. This means that if the discovery attempt - * failed, the locator will no longer be considered one of the - * yet-to-be-discovered locators; and if the attempt succeeded, - * prevents the locator from being placed in the set of - * already-discovered locators. It also prevents any discarded - * or discovered events from being sent. - */ - iter = undiscoveredLocators.iterator(); - while(iter.hasNext()) { - LocatorReg reg = (LocatorReg)iter.next(); - if(!isArrayContains(locators, reg.l)) { - iter.remove(); - }//endif - }//end loop - /* Initiate discovery process for any new, un-discovered locators*/ - discoverLocators(locators); - /* Send a discarded event to all registered listeners for any - * locators that were removed by this method. - */ - if(!groupsMap.isEmpty() && !listeners.isEmpty()) { - addNotify((ArrayList)listeners.clone(), groupsMap, true); + if (terminated) throw new IllegalStateException("discovery terminated"); + + Map groupsMap = new HashMap(1); + /* From the set of already-discovered locators, remove each + * element that is NOT in the input set of locators. + */ + Iterator<LocatorReg> iter = discoveredLocators.iterator(); + List<LocatorReg> remove = new LinkedList<LocatorReg>(); + while(iter.hasNext()) { + LocatorReg reg = iter.next(); + if(!isArrayContains(locators, reg.l)) { + remove.add(reg); + groupsMap.put(reg.getProxy(), reg.getMemberGroups()); + }//endif + }//end loop + discoveredLocators.removeAll(remove); + /* From the set of yet-to-be-discovered locators, remove each + * element that is NOT in the input set of replacement locators. + * + * Note that if the discovery task is currently attempting to + * discover a locator from this set, and if that locator is not + * contained in the given input set of replacement locators (that + * is, it is no longer desired that that locator be discovered), + * then the discovery task, when it completes (either successfully + * or un-successfully) the attempt to discover that locator, will + * end all discovery processing with respect to the affected + * locator. + * + * To inform the discovery task -- upon its return from the + * unicast discovery process -- of the desire to terminate all + * discovery processing for that particular locator, the element + * in the set of undiscoveredLocators that corresponds to that + * locator is removed. This means that if the discovery attempt + * failed, the locator will no longer be considered one of the + * yet-to-be-discovered locators; and if the attempt succeeded, + * prevents the locator from being placed in the set of + * already-discovered locators. It also prevents any discarded + * or discovered events from being sent. + */ + iter = undiscoveredLocators.iterator(); + while(iter.hasNext()) { + LocatorReg reg = (LocatorReg)iter.next(); + if(!isArrayContains(locators, reg.l)) { + iter.remove(); }//endif - }//end sync(this) + }//end loop + /* Initiate discovery process for any new, un-discovered locators*/ + discoverLocators(locators); + /* Send a discarded event to all registered listeners for any + * locators that were removed by this method. + */ + if(!groupsMap.isEmpty() && !listeners.isEmpty()) { + addNotify( listeners, groupsMap, true); + }//endif }//end setLocators /** @@ -926,29 +933,25 @@ abstract class AbstractLookupLocatorDisc */ public void removeLocators(LookupLocator[] locators) { testSetForNull(locators); - synchronized(this) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - HashMap groupsMap = new HashMap(1); - for(int i=0; i<locators.length; i++) { - LocatorReg reg = removeDiscoveredLocator(locators[i]); - if(reg != null) {//removing an already-discovered reg - groupsMap.put(reg.getProxy(), reg.getMemberGroups()); - continue; - }//endif - reg = findReg(locators[i]); - if(reg != null) {//reg not yet discovered, stop discovery of it - undiscoveredLocators.remove(reg); - }//endif - }//end loop - /* Send a discarded event to all registered listeners for any - * locators that were removed by this method. - */ - if(!groupsMap.isEmpty() && !listeners.isEmpty()) { - addNotify((ArrayList)listeners.clone(), groupsMap, true); + if (terminated) throw new IllegalStateException("discovery terminated"); + Map groupsMap = new HashMap(1); + for(int i=0; i<locators.length; i++) { + LocatorReg reg = removeDiscoveredLocator(locators[i]); + if(reg != null) {//removing an already-discovered reg + groupsMap.put(reg.getProxy(), reg.getMemberGroups()); + continue; + }//endif + reg = findReg(locators[i]); + if(reg != null) {//reg not yet discovered, stop discovery of it + undiscoveredLocators.remove(reg); }//endif - }//end sync + }//end loop + /* Send a discarded event to all registered listeners for any + * locators that were removed by this method. + */ + if(!groupsMap.isEmpty() && !listeners.isEmpty()) { + addNotify(listeners, groupsMap, true); + }//endif }//end removeLocators /** @@ -965,19 +968,15 @@ abstract class AbstractLookupLocatorDisc * this method is called after the <code>terminate</code> * method has been called. */ - public synchronized LookupLocator[] getDiscoveredLocators() { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - int size = discoveredLocators.size(); - LookupLocator[] ret = new LookupLocator[size]; + public LookupLocator[] getDiscoveredLocators() { + if (terminated) throw new IllegalStateException("discovery terminated"); + List<LookupLocator> locators = new LinkedList<LookupLocator>(); /* Retrieve the locators of the already-discovered lookup services */ - int k = 0; - Iterator iter = discoveredLocators.iterator(); + Iterator<LocatorReg> iter = discoveredLocators.iterator(); while(iter.hasNext()) { - ret[k++] = ((LocatorReg)iter.next()).l; + locators.add(iter.next().l); }//end loop - return ret; + return locators.toArray(new LookupLocator[locators.size()]); }//end getDiscoveredLocators /** @@ -995,23 +994,20 @@ abstract class AbstractLookupLocatorDisc * this method is called after the <code>terminate</code> * method has been called. */ - public synchronized LookupLocator[] getUndiscoveredLocators() { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - LookupLocator[] locs = new LookupLocator[undiscoveredLocators.size()]; - Iterator iter = undiscoveredLocators.iterator(); + public LookupLocator[] getUndiscoveredLocators() { + if (terminated) throw new IllegalStateException("discovery terminated"); + List<LookupLocator> locators = new LinkedList<LookupLocator>(); + Iterator<LocatorReg> iter = undiscoveredLocators.iterator(); for(int i=0;iter.hasNext();i++) { - locs[i] = ((LocatorReg)iter.next()).l; + locators.add(iter.next().l); }//end loop - return locs; + return locators.toArray(new LookupLocator[locators.size()]); }//end getUndiscoveredLocators /** Initiates the discovery process for the lookup services having the * given locators. */ private void discoverLocators(LookupLocator[] lcts) { - assert Thread.holdsLock(this); discoverLocatorsCalled = true; if (lcts == null) return; LookupLocator lct; @@ -1020,7 +1016,7 @@ abstract class AbstractLookupLocatorDisc LocatorReg reg = findReg(lcts[i]);//in not-yet-discovered map? if(reg == null) { reg = new LocatorReg(lcts[i]); - addToMap(reg); + addAndQueueDiscoveryTaskIfAbsent(reg); }//endif }//end loop }//end discoverLocators @@ -1088,26 +1084,22 @@ abstract class AbstractLookupLocatorDisc * listeners that the locator has been discovered, and return true * to prevent retries from being queued. */ - synchronized (this) { - if(!undiscoveredLocators.contains(reg)) { - return true;//already removed, true ==> don't queue retry - }//endif - /* Discovery un-successful, leave in set, try new wakeup task */ - if(!b) { - return false;//this causes a retry to be queued - }//endif - /* Discovery was successful, move reg from undiscoveredLocators - * to discoveredLocators, and notify listeners - */ - undiscoveredLocators.remove(reg); - discoveredLocators.add(reg); - if(!listeners.isEmpty()) { - addNotify((ArrayList)listeners.clone(), + if(!undiscoveredLocators.contains(reg)) return true;//already removed, true ==> don't queue retry + /* Discovery un-successful, leave in set, try new wakeup task */ + if(!b) return false;//this causes a retry to be queued + /* Discovery was successful, move reg from undiscoveredLocators + * to discoveredLocators, and notify listeners + */ + if (undiscoveredLocators.remove(reg)){ //Atomic + discoveredLocators.add(reg); + if(!listeners.isEmpty()) { + addNotify(listeners, mapRegToGroups(reg.getProxy(), reg.getMemberGroups()), false); }//endif - return true;//done; don't queue any retries - }//end sync(this) + //done; don't queue any retries return true + } //else already removed, true ==> don't queue retry + return true; }//end regTryGetProxy /** From each element of the set of LocatorReg objects that correspond @@ -1116,15 +1108,13 @@ abstract class AbstractLookupLocatorDisc * in an array of ServiceRegistrar. */ private ServiceRegistrar[] buildServiceRegistrar() { - int k = 0; - ServiceRegistrar[] proxys = - new ServiceRegistrar[discoveredLocators.size()]; - Iterator iter = discoveredLocators.iterator(); + List<ServiceRegistrar> regs = new LinkedList<ServiceRegistrar>(); + Iterator<LocatorReg> iter = discoveredLocators.iterator(); while(iter.hasNext()) { - LocatorReg reg = (LocatorReg)iter.next(); - proxys[k++] = reg.getProxy(); + LocatorReg reg = iter.next(); + regs.add(reg.getProxy()); }//end loop - return proxys; + return regs.toArray(new ServiceRegistrar[regs.size()]); }//end buildServiceRegistrar /** @@ -1133,9 +1123,8 @@ abstract class AbstractLookupLocatorDisc * not yet been discovered, and queues a DiscoveryTask to attempt, * through unicast discovery, to discover the associated lookup service. */ - private void addToMap(LocatorReg reg) { - undiscoveredLocators.add(reg);//add to set of not-yet-discovered locs - reg.queueDiscoveryTask(); + private void addAndQueueDiscoveryTaskIfAbsent(LocatorReg reg) { + if (undiscoveredLocators.add(reg)) reg.queueDiscoveryTask(); }//end addToMap /** Determines whether or not the lookup service associated with the @@ -1153,19 +1142,13 @@ abstract class AbstractLookupLocatorDisc /** Add a notification task to the pending queue, and start an instance of * the Notifier thread if one isn't already running. */ - private void addNotify(ArrayList notifies, + private void addNotify(List notifies, Map groupsMap, boolean discard) { - synchronized (pendingNotifies) { - pendingNotifies.addLast(new NotifyTask(notifies, - groupsMap, - discard)); - if (notifierThread == null) { - notifierThread = new Notifier(); - notifierThread.start(); - }//endif - }//end sync + pendingNotifies.addLast(new NotifyTask(notifies, + groupsMap, + discard)); }//end addNotify /** Convenience method used to remove the LocatorReg - corresponding to @@ -1177,7 +1160,7 @@ abstract class AbstractLookupLocatorDisc while(iter.hasNext()) { LocatorReg reg = (LocatorReg)iter.next(); if(reg.l.equals(lct)) { - iter.remove(); + discoveredLocators.remove(reg); return reg; }//endif }//end loop @@ -1194,14 +1177,25 @@ abstract class AbstractLookupLocatorDisc discoveryWakeupMgr.cancelAll();//cancel all tickets }//endif /* Cancel/remove pending tasks from the task manager and terminate */ - if(discoveryTaskMgr != null) { - ArrayList pendingTasks = discoveryTaskMgr.getPending(); - for(int i=0;i<pendingTasks.size();i++) { - RetryTask pendingTask = (RetryTask)pendingTasks.get(i); - pendingTask.cancel();//cancel wakeup ticket - discoveryTaskMgr.remove(pendingTask);//remove from task mgr - }//end loop - discoveryTaskMgr.terminate();//interrupt all active tasks + if(discoveryExecutor != null) { + synchronized (discoveryTasks){ + Iterator<RetryTask> it = discoveryTasks.iterator(); + while (it.hasNext()){ + it.next().cancel();//cancel wakeup ticket + } + } + List<Runnable> pendingTasks = discoveryExecutor.shutdownNow(); + Iterator<Runnable> ir = pendingTasks.iterator(); + while (ir.hasNext()){ + Runnable pendingTask = ir.next(); + if (pendingTask instanceof RetryTask){ + ((RetryTask) pendingTask).cancel();//cancel wakeup ticket + System.err.println("Cancelled RetryTask"); + } else if (pendingTask instanceof Future) { + ((Future) pendingTask).cancel(true); + System.err.println("Task not instanceof RetryTask: " + pendingTask); + } + } }//endif }//end terminateTaskMgr @@ -1214,12 +1208,18 @@ abstract class AbstractLookupLocatorDisc }//end isArrayContains /* Convenience method useful for debugging. */ - private synchronized void printMap () { + public String toString() { + StringBuilder sb = new StringBuilder(300); + String message = "Undiscovered Locators reg:"; + char lineReturn = '\n'; Iterator iter = undiscoveredLocators.iterator(); while(iter.hasNext()) { LocatorReg reg = (LocatorReg)iter.next(); - System.out.println("printMap reg:" + reg.l); + sb.append(message); + sb.append(reg.l); + sb.append(lineReturn); }//end loop + return sb.toString(); }//end printMap /** @@ -1311,40 +1311,37 @@ abstract class AbstractLookupLocatorDisc */ void beginDiscovery(final LookupLocator[] locators) { - synchronized(this) { - if (locators == null) { - return; - } - testSetForNull(locators); - if (initialUnicastDelayRange > 0) { - discoveryWakeupMgr.schedule( - System.currentTimeMillis() + - (long) (Math.random() * initialUnicastDelayRange), - new Runnable() { - public void run() { - synchronized (AbstractLookupLocatorDiscovery.this) { - if (terminated || discoverLocatorsCalled) { - // discoverLocatorsCalled will be true - // if there has been an intervening - // addLocators or setLocators call. - return; - } - discoverLocators(locators); - } - } - } - ); - } else { - synchronized (AbstractLookupLocatorDiscovery.this){ - discoverLocators(locators); + notifierThread.start(); + if (locators == null) { + return; + } + testSetForNull(locators); + if (initialUnicastDelayRange > 0) { + discoveryWakeupMgr.schedule( + System.currentTimeMillis() + + (long) (Math.random() * initialUnicastDelayRange), + new Runnable() { + public void run() { + synchronized (AbstractLookupLocatorDiscovery.this) { + if (terminated || discoverLocatorsCalled) { + // discoverLocatorsCalled will be true + // if there has been an intervening + // addLocators or setLocators call. + return; + } + } + discoverLocators(locators); + } } - } - } + ); + } else { + discoverLocators(locators); + } } private static class Initializer{ ProxyPreparer registrarPreparer; - TaskManager discoveryTaskMgr; + ExecutorService discoveryTaskMgr; WakeupManager discoveryWakeupMgr; Long initialUnicastDelayRange; } @@ -1371,11 +1368,15 @@ abstract class AbstractLookupLocatorDisc new BasicProxyPreparer()); /* Task manager */ try { - i.discoveryTaskMgr = (TaskManager)config.getEntry(COMPONENT_NAME, - "taskManager", - TaskManager.class); + i.discoveryTaskMgr = (ExecutorService)config.getEntry(COMPONENT_NAME, + "executorService", + ExecutorService.class); } catch(NoSuchEntryException e) { /* use default */ - i.discoveryTaskMgr = new TaskManager(MAX_N_TASKS,(15*1000),1.0f); + i.discoveryTaskMgr = + new ThreadPoolExecutor(1, MAX_N_TASKS , + 15L, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("LookupLocatorDiscovery", false)); } /* Wakeup manager */ try {
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupLocatorDiscovery.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupLocatorDiscovery.java?rev=1554723&r1=1554722&r2=1554723&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupLocatorDiscovery.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/LookupLocatorDiscovery.java Thu Jan 2 02:45:07 2014 @@ -17,40 +17,10 @@ */ package net.jini.discovery; -import com.sun.jini.config.Config; -import com.sun.jini.discovery.Discovery; -import com.sun.jini.discovery.DiscoveryConstraints; -import com.sun.jini.discovery.UnicastResponse; -import com.sun.jini.discovery.internal.MultiIPDiscovery; -import com.sun.jini.logging.Levels; -import com.sun.jini.logging.LogUtil; -import com.sun.jini.thread.RetryTask; -import com.sun.jini.thread.TaskManager; -import com.sun.jini.thread.WakeupManager; -import java.io.IOException; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.Socket; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; import java.util.logging.Logger; import net.jini.config.Configuration; import net.jini.config.ConfigurationException; -import net.jini.config.EmptyConfiguration; -import net.jini.config.NoSuchEntryException; -import net.jini.core.constraint.InvocationConstraints; -import net.jini.core.constraint.MethodConstraints; -import net.jini.core.constraint.RemoteMethodControl; import net.jini.core.discovery.LookupLocator; -import net.jini.core.lookup.ServiceRegistrar; -import net.jini.security.BasicProxyPreparer; -import net.jini.security.ProxyPreparer; /** * This class encapsulates the functionality required of an entity that Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java?rev=1554723&r1=1554722&r2=1554723&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java Thu Jan 2 02:45:07 2014 @@ -23,7 +23,6 @@ import com.sun.jini.constants.ThrowableC import com.sun.jini.logging.Levels; import com.sun.jini.logging.LogManager; import com.sun.jini.proxy.ConstrainableProxyUtil; -import com.sun.jini.thread.TaskManager; import java.lang.reflect.Method; import java.rmi.RemoteException; import java.util.ArrayList; @@ -32,6 +31,12 @@ import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; @@ -43,6 +48,7 @@ import net.jini.core.lease.LeaseExceptio import net.jini.core.lease.LeaseMap; import net.jini.core.lease.LeaseMapException; import net.jini.core.lease.UnknownLeaseException; +import org.apache.river.impl.thread.NamedThreadFactory; /** * Provides for the systematic renewal and overall management of a set @@ -175,16 +181,17 @@ import net.jini.core.lease.UnknownLeaseE * should have durations exceeding the <code>roundTripTime</code>. * This entry is obtained in the constructor. * </table> - * <table summary="Describes the taskManager configuration entry" + * <table summary="Describes the executorService configuration entry" * border="0" cellpadding="2"> * <tr valign="top"> * <th scope="col" summary="layout"> <font size="+1">•</font> * <th scope="col" align="left" colspan="2"> <font size="+1"><code> - * taskManager</code></font> + * executorService</code></font> * <tr valign="top"> <td>   <th scope="row" align="right"> - * Type: <td> {@link TaskManager} + * Type: <td> {@link ExecutorService} * <tr valign="top"> <td>   <th scope="row" align="right"> - * Default: <td> <code>new TaskManager(11, 15000, 1.0f)</code> + * Default: <td> <code>new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, + * new LinkedBlockingQueue())</code> * <tr valign="top"> <td>   <th scope="row" align="right"> * Description: <td> The object used to manage queuing tasks * involved with renewing leases and sending notifications. The @@ -193,7 +200,7 @@ import net.jini.core.lease.UnknownLeaseE * seconds before removing idle threads, and uses a load factor of * 1.0 when determining whether to create a new thread. Note that * the implementation of the renewal algorithm includes an assumption - * that the <code>TaskManager</code> uses a load factor of 1.0. + * that the <code>ExecutorService</code> uses a load factor of 1.0. * </table> * * <a name="logging"> @@ -300,7 +307,7 @@ import net.jini.core.lease.UnknownLeaseE * are added to it and the {@link LeaseMap#renewAll} method is called. Otherwise, the * last lease is renewed directly. * <p> - * The <code>TaskManager</code> that manages the renewal threads has a bound on + * The <code>ExecutorService</code> that manages the renewal threads has a bound on * the number of simultaneous threads it will support. The renewal time of * leases may be adjusted earlier in time to reduce the likelihood that the * renewal of a lease will be delayed due to exhaustion of the thread pool. @@ -355,8 +362,15 @@ public class LeaseRenewalManager { /** Time window in which to look for batchable leases */ private long renewBatchTimeWindow = 1000 * 60 * 5; - /** Task manager for queuing and renewing leases */ - TaskManager taskManager = new TaskManager(11, 1000 * 15, 1.0f); + /** Task manager for queuing and renewing leases + * NOTE: test failures occur with queue's that have capacity, + * no test failures occur with SynchronousQueue, for the time + * being, until the cause is sorted out we may need to rely on + * a larger pool, if necessary. TaskManager is likely to have + * lower throughput capacity that ExecutorService with a + * SynchronousQueue although this hasn't been confirmed yet. + */ + final ExecutorService leaseRenewalExecutor; /** * The worst-case renewal round-trip-time @@ -380,7 +394,7 @@ public class LeaseRenewalManager { */ private List calcList; - private final class RenewTask implements TaskManager.Task { + private final class RenewTask implements Runnable { /** Entries of leases to renew (if multiple, all can be batched) */ private final List bList; @@ -454,11 +468,6 @@ public class LeaseRenewalManager { } } - /** No ordering. */ - public boolean runAfter(List tasks, int size) { - return false; - } - /** * Find any expired leases, remove them from bList and * leaseInRenew, and return any with listeners. @@ -706,6 +715,11 @@ public class LeaseRenewalManager { * that initially manages no leases. */ public LeaseRenewalManager() { + leaseRenewalExecutor = + new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + new NamedThreadFactory("LeaseRenewalManager",true), + new CallerRunsPolicy()); } /** @@ -731,8 +745,12 @@ public class LeaseRenewalManager { renewalRTT = Config.getLongEntry( config, LRM, "roundTripTime", renewalRTT, 1, Long.MAX_VALUE); - taskManager = (TaskManager) Config.getNonNullEntry( - config, LRM, "taskManager", TaskManager.class, taskManager); + leaseRenewalExecutor = Config.getNonNullEntry( + config, LRM, "executorService", ExecutorService.class, + new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + new NamedThreadFactory("LeaseRenewalManager",false), + new CallerRunsPolicy()) ); } /** @@ -760,6 +778,10 @@ public class LeaseRenewalManager { long desiredExpiration, LeaseListener listener) { + leaseRenewalExecutor = new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + new NamedThreadFactory("LeaseRenewalManager",true), + new CallerRunsPolicy()); renewUntil(lease, desiredExpiration, listener); } @@ -787,7 +809,7 @@ public class LeaseRenewalManager { * <code>null</code> * @see #renewUntil */ - public void renewUntil(Lease lease, + public final void renewUntil(Lease lease, long desiredExpiration, LeaseListener listener) { @@ -1152,6 +1174,10 @@ public class LeaseRenewalManager { remove(lease); lease.cancel(); } + + public void close(){ + leaseRenewalExecutor.shutdown(); + } /** * Removes a given lease from the managed set of leases; but does @@ -1192,7 +1218,9 @@ public class LeaseRenewalManager { * Subtract one to account for the queuer thread, which should not be * counted. */ - int maxThreads = taskManager.getMaxThreads() - 1; + int maxThreads = leaseRenewalExecutor instanceof ThreadPoolExecutor ? + ((ThreadPoolExecutor)leaseRenewalExecutor).getMaximumPoolSize() - 1 + : 10; if (calcList == null) { calcList = new ArrayList(maxThreads); } @@ -1271,7 +1299,7 @@ public class LeaseRenewalManager { if (queuer == null) { if (newWakeup < Long.MAX_VALUE) { queuer = new QueuerTask(newWakeup); - taskManager.add(queuer); + leaseRenewalExecutor.execute(queuer); } } else if (newWakeup < queuer.wakeup || (newWakeup == Long.MAX_VALUE && leaseInRenew.isEmpty())) @@ -1522,19 +1550,15 @@ public class LeaseRenewalManager { return ((Entry) leases.lastKey()).actualRenew; } - private class QueuerTask implements TaskManager.Task { + private class QueuerTask implements Runnable { /** When to next wake up and queue a new renew task */ - long wakeup; + private long wakeup; QueuerTask(long wakeup) { this.wakeup = wakeup; } - /** No ordering */ - public boolean runAfter(List tasks, int size) { - return false; - } public void run() { synchronized (LeaseRenewalManager.this) { @@ -1546,7 +1570,7 @@ public class LeaseRenewalManager { final long now = System.currentTimeMillis(); long delta = wakeup - now; if (delta <= 0) { - taskManager.add(new RenewTask(now)); + leaseRenewalExecutor.execute(new RenewTask(now)); } else { LeaseRenewalManager.this.wait(delta); }
