Author: peter_firmstone Date: Tue May 7 10:35:23 2013 New Revision: 1479848
URL: http://svn.apache.org/r1479848 Log: Add meaningful messages to PreferredClassLoader logging output. ServiceDiscoveryManager concurrency updates; utilises java.util.concurrent, final and volatile fields, safe publication and prevention of this reference escaping to other threads during construction. A new Terminator thread allows cache termination to be handed off to allow blocking lookup methods to return closer to their waitDur period, without requiring them to perform Remote method call performed during cache termination. Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassLoader.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryEvent.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassLoader.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassLoader.java?rev=1479848&r1=1479847&r2=1479848&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassLoader.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassLoader.java Tue May 7 10:35:23 2013 @@ -420,7 +420,7 @@ public class PreferredClassLoader extend } } } catch (IOException ex) { - Logger.getLogger(PreferredClassLoader.class.getName()).log(Level.SEVERE, null, ex); + Logger.getLogger(PreferredClassLoader.class.getName()).log(Level.SEVERE, "Unable to access preferred resources", ex); except = ex; } finally { try { @@ -428,7 +428,7 @@ public class PreferredClassLoader extend prefIn.close(); } } catch (IOException ex) { - Logger.getLogger(PreferredClassLoader.class.getName()).log(Level.SEVERE, null, ex); + Logger.getLogger(PreferredClassLoader.class.getName()).log(Level.SEVERE, "Problem closing preferred resources input stream", ex); } } Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryEvent.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryEvent.java?rev=1479848&r1=1479847&r2=1479848&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryEvent.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryEvent.java Tue May 7 10:35:23 2013 @@ -41,13 +41,13 @@ public class ServiceDiscoveryEvent exten * * @serial */ - private ServiceItem preEventItem = null; + private final ServiceItem preEventItem; /** Represents the state of the service after the event. * * @serial */ - private ServiceItem postEventItem = null; + private final ServiceItem postEventItem; /** * The constructor of <code>ServiceDiscoveryEvent</code> takes @@ -111,10 +111,12 @@ public class ServiceDiscoveryEvent exten this.preEventItem = new ServiceItem(preEventItem.serviceID, preEventItem.service, preEventItem.attributeSets); + else this.preEventItem = null; if(postEventItem != null) this.postEventItem = new ServiceItem(postEventItem.serviceID, postEventItem.service, postEventItem.attributeSets); + else this.postEventItem = null; } Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1479848&r1=1479847&r2=1479848&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Tue May 7 10:35:23 2013 @@ -64,6 +64,8 @@ import java.rmi.server.RemoteObject; import java.rmi.server.UnicastRemoteObject; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.HashMap; import java.util.LinkedList; @@ -71,8 +73,17 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * The <code>ServiceDiscoveryManager</code> class is a helper utility class @@ -693,10 +704,11 @@ public class ServiceDiscoveryManager { private final static class ServiceDiscoveryListenerImpl implements ServiceDiscoveryListener { - final ArrayList items = new ArrayList(1); + private final List items = new LinkedList(); + public synchronized void serviceAdded(ServiceDiscoveryEvent event) { - items.add(event.getPostEventServiceItem()); - this.notifyAll(); + items.add(event.getPostEventServiceItem()); + this.notifyAll(); } public void serviceRemoved(ServiceDiscoveryEvent event){ } public void serviceChanged(ServiceDiscoveryEvent event){ } @@ -806,10 +818,12 @@ public class ServiceDiscoveryManager { /** A wrapper class for a ServiceRegistrar. */ private final static class ProxyReg { private final ServiceRegistrar proxy; + private final int hash; public ProxyReg(ServiceRegistrar proxy) { if(proxy == null) throw new IllegalArgumentException ("proxy cannot be null"); this.proxy = proxy; + hash = proxy.hashCode(); }//end constructor public boolean equals(Object obj) { @@ -819,7 +833,7 @@ public class ServiceDiscoveryManager { }//end equals public int hashCode() { - return getProxy().hashCode(); + return hash; }//end hashCode /** @@ -843,6 +857,49 @@ public class ServiceDiscoveryManager { "failure occurred while renewing an event lease", false); } }//end class ServiceDiscoveryManager.LeaseListenerImpl + + /** Allows termination of LookupCacheImpl so blocking lookup can return + * quickly + */ + private final class LookupCacheTerminator extends Thread { + private final BlockingQueue<LookupCacheImpl> cacheList = new LinkedBlockingQueue<LookupCacheImpl>(20); + LookupCacheTerminator(){ + super("SDM lookup cache terminator"); + setDaemon(true); + } + + public void start(){ + synchronized (this){ + started = true; + } + super.start(); + } + + public void run(){ + while (!isInterrupted()) { + try { + LookupCacheImpl cache = cacheList.take(); + synchronized (cache){ + cache.terminate(); + } + } catch (InterruptedException ex) { + logger.log(Level.FINE, "SDM lookup cache terminator interrupted", ex); + break; + } + } + } + + void terminate(LookupCacheImpl cache){ + boolean added = cacheList.offer(cache); + if (!added) { // happens if cacheList is full. + // Do it yourself you lazy caller thread! Can't you see I'm busy? + synchronized (cache){ + cache.terminate(); + } + } + } + + } /** Internal implementation of the LookupCache interface. Instances of * this class are used in the blocking versions of lookup() and are @@ -857,10 +914,14 @@ public class ServiceDiscoveryManager { private final class LookupListener implements RemoteEventListener, ServerProxyTrust { - public LookupListener() throws ExportException { - lookupListenerProxy = - (RemoteEventListener)lookupListenerExporter.export(this); + public LookupListener() { }//end constructor + + // Moved export from constructor and LookupCacheImpl constructor + // to avoid publishing partially constructed object. + RemoteEventListener export() throws ExportException { + return (RemoteEventListener)lookupListenerExporter.export(this); + } public void notify(RemoteEvent evt) { ServiceEvent theEvent = (ServiceEvent)evt; @@ -919,28 +980,32 @@ public class ServiceDiscoveryManager { tmpl, lookupListenerProxy, duration); - eventReg.lookupsPending++; - synchronized(serviceIdMap) { - /* Cancel the lease if the cache has been terminated */ - if(bCacheTerminated) { - cancelLease(eventReg.lease); - return; + // eventReg is a new object not visible to other threads yet. + eventReg.lookupsPending++; + /* Cancel the lease if the cache has been terminated */ + if(bCacheTerminated) { + // eventReg.lease is final and is already safely published + cancelLease(eventReg.lease); + return; + } else { + // eventReg will be published safely to other threads + // via eventRegMap and LookupTask + EventReg existed = eventRegMap.putIfAbsent(reg, eventReg); + if (existed == null ) { + (new LookupTask(reg, this.getSeqN(), eventReg)).run(); } else { - eventRegMap.put(reg, eventReg); - }//endif - }//end sync(serviceIdMap) + // Another eventReg.lease exists, cancel new lease. + cancelLease(eventReg.lease); + } + }//endif /* Execute the LookupTask only if there were no problems */ - (new LookupTask(reg, this.getSeqN(), eventReg)).run(); + } catch (Exception e) { - boolean cacheTerminated; - synchronized(serviceIdMap) { - cacheTerminated = bCacheTerminated; - }//end sync - ServiceDiscoveryManager.this.fail + ServiceDiscoveryManager.this.fail (e, reg.getProxy(),this.getClass().getName(),"run", "Exception occurred while attempting to register " +"with the lookup service event mechanism", - cacheTerminated); + bCacheTerminated); } logger.finest("ServiceDiscoveryManager - RegisterListenerTask " +"completed"); @@ -963,7 +1028,7 @@ public class ServiceDiscoveryManager { matches = proxy.lookup(tmpl, Integer.MAX_VALUE); } catch (Exception e) { boolean cacheTerminated; - synchronized(serviceIdMap) { + synchronized(eReg) { eReg.lookupsPending--; cacheTerminated = bCacheTerminated; }//end sync @@ -979,42 +1044,43 @@ public class ServiceDiscoveryManager { +"returned by call to lookup() method contains " +"null 'items' field"); } - synchronized(serviceIdMap) { - /* 1. Cleanup "orphaned" itemReg's. */ - Iterator iter = (serviceIdMap.entrySet()).iterator(); - while(iter.hasNext()) { - Map.Entry e = (Map.Entry)iter.next(); - ServiceID srvcID = (ServiceID)e.getKey(); - ServiceItem itemInSnapshot = findItem(srvcID, - matches.items); - if(itemInSnapshot != null) continue;//not an orphan - ServiceItemReg itemReg = (ServiceItemReg)e.getValue(); - UnmapProxyTask t = new UnmapProxyTask(reg, - itemReg, - srvcID, - taskSeqN++); - cacheTaskMgr.add(t); - }//end loop - /* 2. Handle "new" and "old" items from the given lookup */ - for(int i=0; i<(matches.items).length; i++) { - /* Skip items with null service field (Bug 4378751) */ - if( (matches.items[i]).service == null ) continue; - NewOldServiceTask t = - new NewOldServiceTask(reg, - matches.items[i], - false, - taskSeqN++); + /* 1. Cleanup "orphaned" itemReg's. */ + Iterator<Map.Entry<ServiceID,ServiceItemReg>> iter = (serviceIdMap.entrySet()).iterator(); + while(iter.hasNext()) { + Map.Entry<ServiceID,ServiceItemReg> e = iter.next(); + ServiceID srvcID = e.getKey(); + ServiceItem itemInSnapshot = findItem(srvcID, + matches.items); + if(itemInSnapshot != null) continue;//not an orphan + ServiceItemReg itemReg = e.getValue(); + UnmapProxyTask t = new UnmapProxyTask(reg, + itemReg, + srvcID, + taskSeqN.getAndIncrement()); + cacheTaskMgr.add(t); + }//end loop + /* 2. Handle "new" and "old" items from the given lookup */ + for(int i=0; i<(matches.items).length; i++) { + /* Skip items with null service field (Bug 4378751) */ + if( (matches.items[i]).service == null ) continue; + NewOldServiceTask t = + new NewOldServiceTask(reg, + matches.items[i], + false, + taskSeqN.getAndIncrement()); + cacheTaskMgr.add(t); + }//end loop + /* 3. Handle events that came in prior to lookup */ + synchronized (eReg){ + eReg.lookupsPending--; + Iterator it = eReg.pending.iterator() ; + while (it.hasNext()) { + NotifyEventTask t = (NotifyEventTask) it.next(); + t.thisTaskSeqN = taskSeqN.getAndIncrement(); // assign new seqN cacheTaskMgr.add(t); - }//end loop - /* 3. Handle events that came in prior to lookup */ - eReg.lookupsPending--; - for (iter = eReg.pending.iterator(); iter.hasNext(); ) { - NotifyEventTask t = (NotifyEventTask) iter.next(); - t.thisTaskSeqN = taskSeqN++; // assign new seqN - cacheTaskMgr.add(t); - } - eReg.pending.clear(); - }//end sync(serviceIdMap) + } + eReg.pending.clear(); + }//end sync(eReg) logger.finest("ServiceDiscoveryManager - LookupTask " +"completed"); }//end run @@ -1066,31 +1132,25 @@ public class ServiceDiscoveryManager { public void run() { logger.finest("ServiceDiscoveryManager - ProxyRegDropTask " +"started"); - synchronized(serviceIdMap) { - //lease has already been cancelled by removeProxyReg - if(eventRegMap.containsKey(reg)) { - eventRegMap.remove(reg); - } - }//end sync(serviceIdMap) + //lease has already been cancelled by removeProxyReg + eventRegMap.remove(reg); /* For each itemReg in the serviceIdMap, disassociate the * lookup service referenced here from the itemReg; and * if the itemReg then has no more lookup services associated * with it, remove the itemReg from the map and send a * service removed event. */ - synchronized(serviceIdMap) { - Iterator iter = (serviceIdMap.entrySet()).iterator(); - while(iter.hasNext()) { - Map.Entry e = (Map.Entry)iter.next(); - ServiceID srvcID = (ServiceID)e.getKey(); - ServiceItemReg itemReg = (ServiceItemReg)e.getValue(); - UnmapProxyTask t = new UnmapProxyTask(reg, - itemReg, - srvcID, - taskSeqN++); - cacheTaskMgr.add(t); - }//end loop - }//end sync(serviceIdMap) + Iterator iter = (serviceIdMap.entrySet()).iterator(); + while(iter.hasNext()) { + Map.Entry e = (Map.Entry)iter.next(); + ServiceID srvcID = (ServiceID)e.getKey(); + ServiceItemReg itemReg = (ServiceItemReg)e.getValue(); + UnmapProxyTask t = new UnmapProxyTask(reg, + itemReg, + srvcID, + taskSeqN.getAndIncrement()); + cacheTaskMgr.add(t); + }//end loop logger.finest("ServiceDiscoveryManager - ProxyRegDropTask " +"completed"); }//end run @@ -1224,16 +1284,12 @@ public class ServiceDiscoveryManager { logger.finest("ServiceDiscoveryManager - " +"ServiceDiscardTimerTask started"); /* Exit if this cache has already been terminated. */ - synchronized(serviceIdMap) { - if(bCacheTerminated) return; - }//end sync(serviceIdMap) + if(bCacheTerminated) return; /* Simply return if a MATCH_NOMATCH event arrived for this * item prior to this task running and as a result, the item * was removed from the map. */ - synchronized(serviceIdMap) { - if(!serviceIdMap.containsKey(serviceID)) return; - }//end sync(serviceIdMap) + if(!serviceIdMap.containsKey(serviceID)) return; long curDur = endTime-System.currentTimeMillis(); synchronized(serviceDiscardMutex) { /* Wait until interrupted or time expires */ @@ -1242,9 +1298,7 @@ public class ServiceDiscoveryManager { serviceDiscardMutex.wait(curDur); } catch(InterruptedException e){ } /* Exit if this cache was terminated while waiting. */ - synchronized(serviceIdMap) { - if(bCacheTerminated) return; - }//end sync(serviceIdMap) + if(bCacheTerminated) return; /* Either the wait period has completed or has been * interrupted. If the service ID is no longer in * in the serviceIdMap, then it's assumed that a @@ -1255,9 +1309,7 @@ public class ServiceDiscoveryManager { * re-discovered when it comes back on line. In that * case, exit the thread. */ - synchronized(serviceIdMap) { - if(!serviceIdMap.containsKey(serviceID)) return; - }//end sync(serviceIdMap) + if(!serviceIdMap.containsKey(serviceID)) return; curDur = endTime-System.currentTimeMillis(); }//end loop }//end sync @@ -1283,10 +1335,7 @@ public class ServiceDiscoveryManager { * event was already sent when the service was originally * discarded. */ - ServiceItemReg itemReg = null; - synchronized(serviceIdMap) { - itemReg = (ServiceItemReg)serviceIdMap.get(serviceID); - }//end sync(serviceIdMap) + ServiceItemReg itemReg = serviceIdMap.get(serviceID); if(itemReg != null) { ServiceItem item = null; ServiceItem filteredItem = null; @@ -1325,6 +1374,7 @@ public class ServiceDiscoveryManager { itemToSend = itemReg.filteredItem; }//end sync(itemReg) addServiceNotify(itemToSend); + }//endif logger.finest("ServiceDiscoveryManager - " +"ServiceDiscardTimerTask completed"); @@ -1400,21 +1450,24 @@ public class ServiceDiscoveryManager { +"started"); boolean changed = false; ServiceItemReg itemReg; - synchronized(serviceIdMap) { - itemReg = (ServiceItemReg)serviceIdMap.get(thisTaskSid); - if (itemReg == null) { - if( !eventRegMap.containsKey(reg) ) { - /* reg must have been discarded, simply return */ - logger.finest("ServiceDiscoveryManager - " - +"NewOldServiceTask completed"); - return; - }//endif - itemReg = new ServiceItemReg( reg.getProxy(), srvcItem ); - serviceIdMap.put( thisTaskSid, itemReg ); - } else { - changed = true; - } - }//end sync(serviceIdMap) + itemReg = (ServiceItemReg)serviceIdMap.get(thisTaskSid); + if (itemReg == null) { + if( !eventRegMap.containsKey(reg) ) { + /* reg must have been discarded, simply return */ + logger.finest("ServiceDiscoveryManager - " + +"NewOldServiceTask completed"); + return; + }//endif + itemReg = new ServiceItemReg( reg.getProxy(), srvcItem ); + ServiceItemReg existed = serviceIdMap.putIfAbsent( thisTaskSid, itemReg ); + if (existed != null) { + itemReg = existed; + // Probably changed while we were stuffing around. + changed = true; + } + } else { + changed = true; + } if(changed) {//a. old, previously discovered item itemMatchMatchChange(reg.getProxy(), srvcItem, itemReg, matchMatchEvent); @@ -1500,25 +1553,25 @@ public class ServiceDiscoveryManager { private final static int ITEM_CHANGED = 3; /* The listener that receives remote events from the lookup services */ - private LookupListener lookupListener; + private final LookupListener lookupListener; /* Exporter for the remote event listener (lookupListener) */ - private Exporter lookupListenerExporter; + private volatile Exporter lookupListenerExporter; /* Proxy to the listener that receives remote events from lookups */ - private RemoteEventListener lookupListenerProxy; + private volatile RemoteEventListener lookupListenerProxy; /** Task manager for the various tasks executed by this LookupCache */ - private TaskManager cacheTaskMgr; + private volatile TaskManager cacheTaskMgr; /* Flag that indicates if the LookupCache has been terminated. */ - private boolean bCacheTerminated = false; + private volatile boolean bCacheTerminated = false; /* Contains the ServiceDiscoveryListener's that receive local events */ private final ArrayList<ServiceDiscoveryListener> sItemListeners = new ArrayList<ServiceDiscoveryListener>(1); /* Map from ServiceID to ServiceItemReg */ - private final HashMap<ServiceID,ServiceItemReg> serviceIdMap = new HashMap<ServiceID,ServiceItemReg>(); + private final ConcurrentMap<ServiceID,ServiceItemReg> serviceIdMap = new ConcurrentHashMap<ServiceID,ServiceItemReg>(); /* Map from ProxyReg to EventReg: (proxyReg, {source,id,seqNo,lease})*/ - private final HashMap<ProxyReg,EventReg> eventRegMap = new HashMap<ProxyReg,EventReg>(); + private final ConcurrentMap<ProxyReg,EventReg> eventRegMap = new ConcurrentHashMap<ProxyReg,EventReg>(); /* Template current cache instance should use for primary matching */ - private ServiceTemplate tmpl; + private final ServiceTemplate tmpl; /* Filter current cache instance should use for secondary matching */ - private ServiceItemFilter filter = null; + private final ServiceItemFilter filter; /* Desired lease duration to request from lookups' event mechanisms */ private final long leaseDuration; /* Log the time when the cache gets created. This value is used to @@ -1526,7 +1579,7 @@ public class ServiceDiscoveryManager { */ private final long startTime = System.currentTimeMillis(); /** For tasks waiting on verification events after service discard */ - private TaskManager serviceDiscardTimerTaskMgr; + private volatile TaskManager serviceDiscardTimerTaskMgr; /* Thread mutex used to interrupt all ServiceDiscardTimerTasks */ private final Object serviceDiscardMutex = new Object(); /** Whenever a ServiceIdTask is created in this cache, it is assigned @@ -1536,7 +1589,7 @@ public class ServiceDiscoveryManager { * the sequence number assigned to the most recently created * ServiceIdTask. */ - private long taskSeqN = 0; + private final AtomicLong taskSeqN = new AtomicLong(); public LookupCacheImpl(ServiceTemplate tmpl, ServiceItemFilter filter, @@ -1546,24 +1599,17 @@ public class ServiceDiscoveryManager { this.tmpl = copyServiceTemplate(tmpl); this.leaseDuration = leaseDuration; this.filter = filter; - initCache(); lookupListener = new LookupListener(); - synchronized(sItemListeners) { - if(sListener != null ) sItemListeners.add(sListener); - }//end sync(sItemListeners) - + if(sListener != null ) sItemListeners.add(sListener); }//end constructor // This method's javadoc is inherited from an interface of this class public void terminate() { - synchronized(serviceIdMap) { + synchronized(this) { if(bCacheTerminated) return;//allow for multiple terminations bCacheTerminated = true; }//end sync - synchronized(caches) { - int index = caches.indexOf(this); - if(index != -1) caches.remove(index); - }//end sync + caches.remove(this); /* Terminate all tasks: first, terminate this cache's TaskManager*/ terminateTaskMgr(cacheTaskMgr); /* Terminate ServiceDiscardTimerTasks running for this cache */ @@ -1571,15 +1617,13 @@ public class ServiceDiscoveryManager { terminateTaskMgr(serviceDiscardTimerTaskMgr); }//end sync(serviceDiscardMutex) /* Cancel all event registration leases held by this cache. */ - synchronized(serviceIdMap) { - Set set = eventRegMap.entrySet(); - Iterator iter = set.iterator(); - while(iter.hasNext()) { - Map.Entry e = (Map.Entry)iter.next(); - EventReg eReg = (EventReg)e.getValue(); - cancelLease(eReg.lease); - }//end loop - }//end sync(serviceIdMap) + Set set = eventRegMap.entrySet(); + Iterator iter = set.iterator(); + while(iter.hasNext()) { + Map.Entry e = (Map.Entry)iter.next(); + EventReg eReg = (EventReg)e.getValue(); + cancelLease(eReg.lease); + }//end loop /* Un-export the remote listener for events from lookups. */ try { lookupListenerExporter.unexport(true); @@ -1658,12 +1702,7 @@ public class ServiceDiscoveryManager { /* Returns the iterator of entry set in the copy of the ServiceIdMap */ private Iterator getServiceIdMapEntrySetIterator() { - HashMap serviceIdMapCopy; - synchronized(serviceIdMap) { - serviceIdMapCopy = (HashMap)serviceIdMap.clone(); - } - Set set = serviceIdMapCopy.entrySet(); - return set.iterator(); + return serviceIdMap.entrySet().iterator(); }//end LookupCacheImpl.getServiceIdMapEntrySetIterator /** This method returns a <code>ServiceItem</code> array containing @@ -1692,7 +1731,7 @@ public class ServiceDiscoveryManager { * <code>filter</code> is applied. */ private ServiceItem[] getServiceItems(ServiceItemFilter filter2) { - ArrayList items = new ArrayList(1); + List<ServiceItem> items = new LinkedList<ServiceItem>(); Iterator iter = getServiceIdMapEntrySetIterator(); while(iter.hasNext()) { Map.Entry e = (Map.Entry)iter.next(); @@ -1757,10 +1796,8 @@ public class ServiceDiscoveryManager { * @param reg a ProxyReg to add. */ public void addProxyReg(ProxyReg reg) { - RegisterListenerTask treg; - synchronized(serviceIdMap) { - treg = new RegisterListenerTask(reg, taskSeqN++); - }//end sync(serviceIdMap) + RegisterListenerTask treg = + new RegisterListenerTask(reg, taskSeqN.getAndIncrement()); cacheTaskMgr.add(treg); }//end LookupCacheImpl.addProxyReg @@ -1770,20 +1807,18 @@ public class ServiceDiscoveryManager { */ public void removeProxyReg(ProxyReg reg) { ProxyRegDropTask t; - synchronized(serviceIdMap) { - //let the ProxyRegDropTask do the eventRegMap.remove - EventReg eReg = (EventReg)eventRegMap.get(reg); - if(eReg != null) { - try { - leaseRenewalMgr.remove(eReg.lease); - } catch(Exception e) { - logger.log(Level.FINER, - "exception occurred while removing an " - +"event registration lease", e); - } - }//endif - t = new ProxyRegDropTask(reg, taskSeqN++); - }//end sync(serviceIdMap) + //let the ProxyRegDropTask do the eventRegMap.remove + EventReg eReg = eventRegMap.get(reg); + if(eReg != null) { + try { + leaseRenewalMgr.remove(eReg.lease); + } catch(Exception e) { + logger.log(Level.FINER, + "exception occurred while removing an " + +"event registration lease", e); + } + }//endif + t = new ProxyRegDropTask(reg, taskSeqN.getAndIncrement()); removeUselessTask(reg); cacheTaskMgr.add(t); }//end LookupCacheImpl.removeProxyReg @@ -1793,12 +1828,10 @@ public class ServiceDiscoveryManager { */ private void checkCacheTerminated() { checkTerminated(); - synchronized(serviceIdMap) { - if(bCacheTerminated) { - throw new IllegalStateException - ("this lookup cache was terminated"); - }//endif - }//end sync(serviceIdMap) + if(bCacheTerminated) { + throw new IllegalStateException + ("this lookup cache was terminated"); + }//endif }//end LookupCacheImpl.checkCacheTerminated /** Called by the lookupListener's notify() method. Checks the event @@ -1846,41 +1879,41 @@ public class ServiceDiscoveryManager { int transition) { if(eventSource == null) return; - synchronized(serviceIdMap) { - /* Search eventRegMap for ProxyReg corresponding to event. */ - ProxyReg reg = null; - EventReg eReg = null; - Set set = eventRegMap.entrySet(); - Iterator iter = set.iterator(); - while(iter.hasNext()) { - Map.Entry e = (Map.Entry)iter.next(); - eReg = (EventReg)e.getValue(); - if( eventSource.equals(eReg.source) - && (eventID == eReg.eventID) ) - { - reg = (ProxyReg)e.getKey(); - break; - }//endif - }//end loop - if(reg == null) return;//event arrived before eventReg in map + /* Search eventRegMap for ProxyReg corresponding to event. */ + ProxyReg reg = null; + EventReg eReg = null; + Set set = eventRegMap.entrySet(); + Iterator iter = set.iterator(); + while(iter.hasNext()) { + Map.Entry e = (Map.Entry)iter.next(); + eReg = (EventReg)e.getValue(); + if( eventSource.equals(eReg.source) + && (eventID == eReg.eventID) ) + { + reg = (ProxyReg)e.getKey(); + break; + }//endif + }//end loop + if(reg == null) return;//event arrived before eventReg in map - /* Next, look for gaps in the event sequence. */ + /* Next, look for gaps in the event sequence. */ + synchronized (eReg){ long prevSeqNo = eReg.seqNo; eReg.seqNo = seqNo; CacheTask t; if(seqNo == (prevSeqNo+1)) {//no gap, handle current event t = new NotifyEventTask - (reg, sid, item, transition, taskSeqN++); - if (eReg.lookupsPending > 0) { - eReg.pending.add(t); - return; - } - } else if (eReg.lookupsPending > 1) { - // gap in event sequence, but snapshot already pending - return; + (reg, sid, item, transition, taskSeqN.getAndIncrement()); + if (eReg.lookupsPending > 0) { + eReg.pending.add(t); + return; + } + } else if (eReg.lookupsPending > 1) { + // gap in event sequence, but snapshot already pending + return; } else {//gap in event sequence, request snapshot - eReg.lookupsPending++; - t = new LookupTask(reg, taskSeqN++, eReg); + eReg.lookupsPending++; + t = new LookupTask(reg, taskSeqN.getAndIncrement(), eReg); if( logger.isLoggable(Levels.HANDLED) ) { StringBuilder sb = new StringBuilder(300); sb.append("notifyServiceMap - GAP in event sequence ") @@ -1903,7 +1936,7 @@ public class ServiceDiscoveryManager { }//endif }//endif cacheTaskMgr.add(t); - }//end sync(serviceIdMap) + } //end sync(eReg) }//end LookupCacheImpl.notifyServiceMap /** Removes from the cache's task manager, all pending tasks @@ -1942,9 +1975,7 @@ public class ServiceDiscoveryManager { /** Removes an entry in the serviceIdMap, but sends no notification. */ private void removeServiceIdMapSendNoEvent(ServiceID sid) { - synchronized(serviceIdMap) { - serviceIdMap.remove(sid); - } + serviceIdMap.remove(sid); }//end LookupCacheImpl.removeServiceIdMapSendNoEvent /** Returns the element in the given items array having the given @@ -2239,14 +2270,12 @@ public class ServiceDiscoveryManager { serviceDiscardTimerTaskMgr = new TaskManager (10,(15*1000),1.0f); } - ArrayList set; - synchronized(proxyRegSet) { - set = new ArrayList(proxyRegSet); - }//end sync(proxyRegSet) - for(int i=0; i<set.size(); i++) { - ProxyReg reg = (ProxyReg)set.get(i); - addProxyReg(reg); - }//end loop + // Moved here from constructor to avoid publishing this reference + lookupListenerProxy = lookupListener.export(); + Iterator<ProxyReg> it = proxyRegSet.iterator(); + while (it.hasNext()){ + addProxyReg(it.next()); + } }//end LookupCacheImpl.initCache /** Applies the first-stage <code>filter</code> associated with @@ -2292,10 +2321,7 @@ public class ServiceDiscoveryManager { /* Handle filter fail */ if(!pass) { ServiceID srvcID = item.serviceID; - ServiceItemReg itemReg = null; - synchronized(serviceIdMap) { - itemReg = (ServiceItemReg)serviceIdMap.get(srvcID); - }//end sync(serviceIdMap) + ServiceItemReg itemReg = serviceIdMap.get(srvcID); if(itemReg != null) { if(sendEvent) { ServiceItem oldFilteredItem; @@ -2337,10 +2363,7 @@ public class ServiceDiscoveryManager { private void addFilteredItemToMap(ServiceItem item, ServiceItem filteredItem) { - ServiceItemReg itemReg = null; - synchronized(serviceIdMap) { - itemReg = (ServiceItemReg)serviceIdMap.get(item.serviceID); - }//end sync(serviceIdMap) + ServiceItemReg itemReg = serviceIdMap.get(item.serviceID); if(itemReg == null) return; boolean itemRegIsDiscarded; synchronized(itemReg) { @@ -2365,10 +2388,7 @@ public class ServiceDiscoveryManager { private void discardRetryLater(ServiceItem item, ServiceRegistrar proxy, boolean sendEvent) { - ServiceItemReg itemReg = null; - synchronized(serviceIdMap) { - itemReg = (ServiceItemReg)serviceIdMap.get(item.serviceID); - }//end sync(serviceIdMap) + ServiceItemReg itemReg = serviceIdMap.get(item.serviceID); if(itemReg == null) return; ServiceItem oldFilteredItem; synchronized(itemReg) { @@ -2400,10 +2420,7 @@ public class ServiceDiscoveryManager { ServiceID srvcID, ServiceItem item) { - ServiceItemReg itemReg = null; - synchronized(serviceIdMap) { - itemReg = (ServiceItemReg)serviceIdMap.get(srvcID); - }//end sync(serviceIdMap) + ServiceItemReg itemReg = serviceIdMap.get(srvcID); if(itemReg != null) { ServiceItem newItem; boolean itemRegHasNoProxys; @@ -2462,16 +2479,20 @@ public class ServiceDiscoveryManager { /* The LeaseRenewalManager to use (passed in, or create one). */ private final LeaseRenewalManager leaseRenewalMgr; /* Contains all of the discovered lookup services (ServiceRegistrar). */ - private final List<ProxyReg> proxyRegSet = new ArrayList<ProxyReg>(1); + private final Set<ProxyReg> proxyRegSet = Collections.newSetFromMap(new ConcurrentHashMap<ProxyReg,Boolean>()); /* Contains all of the DiscoveryListener's employed in lookup discovery. */ - private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>(1); + private final List<DiscoveryListener> listeners = new CopyOnWriteArrayList<DiscoveryListener>(); /* Random number generator for use in lookup. */ private final Random random = new Random(); /* Contains all of the instances of LookupCache that are requested. */ - private final ArrayList<LookupCache> caches = new ArrayList<LookupCache>(1); + private final List<LookupCache> caches = new CopyOnWriteArrayList<LookupCache>(); /* Flag to indicate if the ServiceDiscoveryManager has been terminated. */ - private boolean bTerminated = false; + private volatile boolean bTerminated = false; + + private final LookupCacheTerminator terminator; + + private volatile boolean started = false; /* Object used to obtain the configuration items for this utility. */ private final Configuration thisConfig; /* Preparer for the proxies to the lookup services that are discovered @@ -2511,20 +2532,14 @@ public class ServiceDiscoveryManager { continue; } ProxyReg reg = new ProxyReg(proxys[i]); - synchronized(proxyRegSet) { - proxyRegSet.add(reg); - newProxys.add(reg); - }//end sync(proxyRegSet) + proxyRegSet.add(reg); + newProxys.add(reg); }//end loop - synchronized(listeners) { - if(!listeners.isEmpty()) - notifies = new ArrayList<DiscoveryListener>(listeners); - }//end sync(listeners) Iterator<ProxyReg> iter = newProxys.iterator(); while(iter.hasNext()) { ProxyReg reg = iter.next(); cacheAddProxy(reg); - if(notifies != null) listenerDiscovered(reg.getProxy(), notifies); + if(!listeners.isEmpty()) listenerDiscovered(reg.getProxy(), listeners); }//end loop }//end DiscMgrListener.discovered @@ -2532,31 +2547,23 @@ public class ServiceDiscoveryManager { public void discarded(DiscoveryEvent e) { ServiceRegistrar[] proxys = e.getRegistrars(); List<DiscoveryListener> notifies; - List<ProxyReg> drops = new ArrayList<ProxyReg>(1); - synchronized(proxyRegSet) { - for(int i=0; i<proxys.length; i++) { - ProxyReg reg = findReg(proxys[i]); - if(reg != null ) { // this check can be removed. - proxyRegSet.remove(proxyRegSet.indexOf(reg)); - drops.add(reg); - } else { - //River-337 - logger.severe("discard error, proxy was null"); - //throw new RuntimeException("discard error"); - }//endif - }//end loop - }//end sync(proxyRegSet) + List<ProxyReg> drops = new LinkedList<ProxyReg>(); + for(int i=0; i<proxys.length; i++) { + ProxyReg reg = removeReg(proxys[i]); + if(reg != null ) { // this check can be removed. + drops.add(reg); + } else { + //River-337 + logger.severe("discard error, proxy was null"); + //throw new RuntimeException("discard error"); + }//endif + }//end loop Iterator<ProxyReg> iter = drops.iterator(); while(iter.hasNext()) { dropProxy(iter.next()); }//end loop if (!drops.isEmpty()){ - notifies = new LinkedList<DiscoveryListener>(); - synchronized(listeners) { - if(listeners.isEmpty()) return; - notifies.addAll(listeners); - }//end sync(listeners) - listenerDropped(drops, notifies); + listenerDropped(drops, listeners); } }//end DiscMgrListener.discarded @@ -2569,24 +2576,20 @@ public class ServiceDiscoveryManager { /** Adds the given proxy to all the caches maintained by the SDM. */ private void cacheAddProxy(ProxyReg reg) { - synchronized(caches) { - Iterator iter = caches.iterator(); - while (iter.hasNext()) { - LookupCacheImpl cache = (LookupCacheImpl)iter.next(); - cache.addProxyReg(reg); - }//end loop - } + Iterator iter = caches.iterator(); + while (iter.hasNext()) { + LookupCacheImpl cache = (LookupCacheImpl)iter.next(); + cache.addProxyReg(reg); + }//end loop }//end cacheAddProxy /** Removes the given proxy from all the caches maintained by the SDM. */ private void dropProxy(ProxyReg reg ) { - synchronized(caches) { - Iterator iter = caches.iterator(); - while (iter.hasNext()) { - LookupCacheImpl cache= (LookupCacheImpl)iter.next(); - cache.removeProxyReg(reg); - }//end loop - } + Iterator iter = caches.iterator(); + while (iter.hasNext()) { + LookupCacheImpl cache= (LookupCacheImpl)iter.next(); + cache.removeProxyReg(reg); + }//end loop }//end dropProxy /** @@ -2843,6 +2846,7 @@ public class ServiceDiscoveryManager { discMgrInternal = init.discMgrInternal; discMgrListener = new DiscMgrListener(); discMgr.addDiscoveryListener(discMgrListener); + terminator = new LookupCacheTerminator(); } /** Sends discarded event to each listener waiting for discarded lookups.*/ @@ -2875,16 +2879,13 @@ public class ServiceDiscoveryManager { /** Returns array of ServiceRegistrar created from the proxyRegSet */ private ServiceRegistrar[] buildServiceRegistrar() { - synchronized (proxyRegSet){ - int k = 0; - ServiceRegistrar[] proxys = new ServiceRegistrar[proxyRegSet.size()]; - Iterator<ProxyReg> iter = proxyRegSet.iterator(); - while(iter.hasNext()) { - ProxyReg reg = iter.next(); - proxys[k++] = reg.getProxy(); - }//end loop - return proxys; - } + List<ServiceRegistrar> proxys = new LinkedList<ServiceRegistrar>(); + Iterator<ProxyReg> iter = proxyRegSet.iterator(); + while(iter.hasNext()) { + ProxyReg reg = iter.next(); + proxys.add(reg.getProxy()); + }//end loop + return proxys.toArray(new ServiceRegistrar[proxys.size()]); }//end buildServiceRegistrar /** @@ -3110,7 +3111,7 @@ public class ServiceDiscoveryManager { }//end sync(cacheListener) return sm; } finally { - if(cache != null) cache.terminate(); + if(cache != null) terminator.terminate(cache); } }//end lookup @@ -3254,22 +3255,13 @@ public class ServiceDiscoveryManager { discMgr.removeDiscoveryListener(discMgrListener); if(discMgrInternal) discMgr.terminate(); }//end sync + terminator.interrupt(); /* Terminate all caches: cancel event leases, un-export listeners */ - boolean terminateCaches = false; - ArrayList cachesClone = null; - synchronized(caches) { - if( !caches.isEmpty() ) { - terminateCaches = true; - cachesClone = (ArrayList)caches.clone(); - } - }//end sync - if(terminateCaches) { - Iterator iter = cachesClone.iterator(); - while (iter.hasNext()) { - LookupCacheImpl cache = (LookupCacheImpl)iter.next(); - cache.terminate(); - }//end loop - }//endif(terminateCaches) + Iterator iter = caches.iterator(); + while (iter.hasNext()) { + LookupCacheImpl cache = (LookupCacheImpl)iter.next(); + cache.terminate(); + }//end loop }//end terminate /** @@ -3554,13 +3546,17 @@ public class ServiceDiscoveryManager { throw new IllegalArgumentException ("maxMatches must be > minMatches"); + long delay = System.currentTimeMillis(); ServiceItem [] sItems = lookup(tmpl, maxMatches, filter); if(sItems.length >= minMatches) return sItems; - ArrayList sItemSet = new ArrayList(sItems.length); + List<ServiceItem> sItemSet = new LinkedList<ServiceItem>(); for(int i=0; i<sItems.length; i++) { //if(!sItemSet.contains(sItems[i]) - sItemSet.add(sItems[i]); - }//end loop + //sItemSet.add(sItems[i]); + if(!isArrayContainsServiceItem(sItemSet, sItems[i])) { + sItemSet.add(sItems[i]); + }//endif + }//end loop ServiceDiscoveryListenerImpl cacheListener = new ServiceDiscoveryListenerImpl(); /* The cache must be created inside the listener sync block, @@ -3581,11 +3577,12 @@ public class ServiceDiscoveryManager { * released until the wait() method is invoked. */ LookupCacheImpl cache = null; - synchronized(cacheListener) { + synchronized(cacheListener) { // uncontended lock. + delay = (System.currentTimeMillis() - delay) + 1; // Calculate initial time delay in ms. cache = createLookupCache(tmpl,filter,cacheListener,waitDur); long duration = cache.getLeaseDuration(); - while ( duration > 0 ) { - cacheListener.wait(duration); + while ( duration > delay ) { // Some milli's to spare to ensure we return in reasonable time. + cacheListener.wait(duration - delay); ServiceItem items[] = cacheListener.getServiceItem(); for(int i=0; i<items.length; i++) { if(!isArrayContainsServiceItem(sItemSet, items[i])) { @@ -3596,10 +3593,22 @@ public class ServiceDiscoveryManager { duration = cache.getLeaseDuration(); }//end loop }//end sync(cacheListener) - cache.terminate(); - ServiceItem [] r = new ServiceItem[sItemSet.size()]; - sItemSet.toArray(r); - return r; + // Termination is now performed by a dedicated thread to ensure + // Remote method call doesn't take too long. + terminator.terminate(cache); + if (sItemSet.size() > maxMatches){ + // Discard some matches + ServiceItem[] r = new ServiceItem[maxMatches]; + // Iterator is faster for LinkedList. + Iterator<ServiceItem> it = sItemSet.iterator(); + for ( int i = 0; it.hasNext() && i < maxMatches; i++){ + r[i] = it.next(); + } + return r; + } + ServiceItem [] r = new ServiceItem[sItemSet.size()]; + sItemSet.toArray(r); + return r; }//end lookup /** From the given set of ServiceMatches, randomly selects and returns @@ -3629,25 +3638,30 @@ public class ServiceDiscoveryManager { long leaseDuration) throws RemoteException { + if (!started) terminator.start(); if(tmpl == null) tmpl = new ServiceTemplate(null, null, null); LookupCacheImpl cache = new LookupCacheImpl(tmpl, filter, listener, leaseDuration); cache.initCache(); - synchronized(caches) { - caches.add(cache); - } + caches.add(cache); logger.finest("ServiceDiscoveryManager - LookupCache created"); return cache; }//end createLookupCache - /** Returns element from proxyRegSet that corresponds to the given proxy.*/ - private ProxyReg findReg(ServiceRegistrar proxy) { - Iterator iter = proxyRegSet.iterator(); + /** Removes and returns element from proxyRegSet that corresponds to the given proxy.*/ + private ProxyReg removeReg(ServiceRegistrar proxy){ + Iterator<ProxyReg> iter = proxyRegSet.iterator(); while(iter.hasNext()) { - ProxyReg reg =(ProxyReg)iter.next(); - if(reg.getProxy().equals(proxy)) return reg; + ProxyReg reg = iter.next(); + // ProxyReg hashcode is same as proxy - optimisation + if (reg.hashCode() == proxy.hashCode()){ + if(reg.getProxy().equals(proxy)){ + iter.remove(); + return reg; + } + } }//end loop return null; - }//end findReg + }//end removeReg /** Convenience method invoked when failure occurs in the cache * tasks executed in this utility. If the appropriate logging level @@ -3769,12 +3783,10 @@ public class ServiceDiscoveryManager { * ServiceDiscoveryManager has been terminated. */ private void checkTerminated() { - synchronized(this) { - if(bTerminated) { - throw new IllegalStateException - ("service discovery manager was terminated"); - }//endif - }//end sync + if(bTerminated) { + throw new IllegalStateException + ("service discovery manager was terminated"); + }//endif }//end checkTerminated /** Returns a "non-shallow" (not just a clone) copy of the given
