Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java?rev=1554723&r1=1554722&r2=1554723&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java Thu Jan 2 02:45:07 2014 @@ -62,8 +62,17 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -83,6 +92,7 @@ import net.jini.security.BasicProxyPrepa import net.jini.security.ProxyPreparer; import net.jini.security.Security; import net.jini.security.SecurityContext; +import org.apache.river.impl.thread.NamedThreadFactory; /** @@ -101,8 +111,8 @@ abstract class AbstractLookupDiscovery i /* Logger used by this utility. */ private static final Logger logger = Logger.getLogger(COMPONENT_NAME); - /** Maximum number of concurrent tasks that can be run in any task - * manager created by this class. + /** Maximum number of concurrent tasks that can be run in any executor + * created by this class. */ private static final int MAX_N_TASKS = 15; /** Default maximum size of multicast packets to send and receive. */ @@ -115,13 +125,13 @@ abstract class AbstractLookupDiscovery i /** Flag indicating whether or not this class is still functional. */ private volatile boolean terminated = false; /** Set of listeners to be sent discovered/discarded/changed events. Access sync on registrars */ - private final ArrayList<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>(1); + private final List<DiscoveryListener> listeners = new CopyOnWriteArrayList<DiscoveryListener>(); /** The groups to discover. Empty set -- NO_GROUPS, access synchronised on registrars */ private final Set<String> groups; /** If groups passed to constructor are null -- ALL_GROUPS, writes synchronised on registrars */ private volatile boolean all_groups; /** Map from ServiceID to UnicastResponse. */ - private final Map<ServiceID,UnicastResponse> registrars = new HashMap<ServiceID,UnicastResponse>(11); + private final Map<ServiceID,UnicastResponse> registrars = new ConcurrentHashMap<ServiceID,UnicastResponse>(11); /** * Set that takes one of the following: * <p><ul> @@ -136,12 +146,12 @@ abstract class AbstractLookupDiscovery i * of the element, determines the processing to perform and what event * type to send to the registered listeners. */ - private final Set pendingDiscoveries = new HashSet(11); + private final Set pendingDiscoveries = Collections.newSetFromMap(new ConcurrentHashMap()); /** Thread that handles pending notifications. */ private final Notifier notifierThread; /** Notifications to be sent to listeners. Synchronised access with lock notify */ - private final Deque<NotifyTask> pendingNotifies = new LinkedList<NotifyTask>(); - /** Task manager for running UnicastDiscoveryTasks and + private final BlockingDeque<NotifyTask> pendingNotifies = new LinkedBlockingDeque<NotifyTask>(); + /** ExecutorService for running UnicastDiscoveryTasks and * DecodeAnnouncementTasks. */ private final ExecutorService executor; @@ -176,7 +186,7 @@ abstract class AbstractLookupDiscovery i * * Access synchronised on registrars. */ - private final HashMap<ServiceID,AnnouncementInfo> regInfo = new HashMap<ServiceID,AnnouncementInfo>(11); + private final ConcurrentMap<ServiceID,AnnouncementInfo> regInfo = new ConcurrentHashMap<ServiceID,AnnouncementInfo>(11); /** Thread that monitors multicast announcements from already-discovered * lookup services and, upon determining that those announcements have * stopped, queues a reachability test with the UnicastDiscoveryTask @@ -295,18 +305,13 @@ abstract class AbstractLookupDiscovery i logger.finest("LookupDiscovery - Notifier thread started"); while (!interrupted()) { final NotifyTask task; - synchronized (pendingNotifies) { - if (pendingNotifies.isEmpty()) { - try { - pendingNotifies.wait(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); // restore - break; - } - }//endif - task = pendingNotifies.removeFirst(); - if (task == null) continue; // spurious wakeup. - }//end sync + try { + task = pendingNotifies.takeFirst(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); // restore + break; + } + /* The call to notify() on the registered listeners is * performed inside a doPrivileged block that restores the * access control context that was in place when this utility @@ -661,10 +666,8 @@ abstract class AbstractLookupDiscovery i } catch (IOException e) { } break; }//end if - synchronized (pendingDiscoveries) { - pendingDiscoveries.add(sock); - restoreContextAddTask(new UnicastDiscoveryTask(sock)); - }//end sync + pendingDiscoveries.add(sock); + restoreContextAddTask(new UnicastDiscoveryTask(sock)); } catch (InterruptedIOException e) { break; } catch (Exception e) {//ignore @@ -815,33 +818,29 @@ abstract class AbstractLookupDiscovery i while(!isInterrupted()) { sleep(multicastAnnouncementInterval); long curTime = System.currentTimeMillis(); - synchronized (registrars) { - /* Previously regInfo was cloned to avoid synchronizing - * this was changed to a simple synchronized block during code - * auditing as it appeared like an unnecessary - * performance optimisation that risked atomicity. - */ - Set<Map.Entry<ServiceID,AnnouncementInfo>> eSet = regInfo.entrySet(); - for(Iterator<Map.Entry<ServiceID,AnnouncementInfo>> itr = eSet.iterator(); itr.hasNext(); ) { - Map.Entry<ServiceID,AnnouncementInfo> pair = itr.next(); - ServiceID srvcID = pair.getKey(); - long tStamp = pair.getValue().gettStamp(); - long deltaT = curTime - tStamp; - if(deltaT > timeThreshold) { - /* announcements stopped, queue reachability - * test and potential discarded event - */ - UnicastResponse resp = registrars.get(srvcID); - Object req = new CheckReachabilityMarker(resp); - synchronized (pendingDiscoveries) { - if(pendingDiscoveries.add(req)) { - restoreContextAddTask( - new UnicastDiscoveryTask(req)); - }//endif - }//end sync - }//end if - }//end loop (itr) - }//end sync + /* Previously regInfo was cloned to avoid synchronizing + * this was changed to a simple synchronized block during code + * auditing as it appeared like an unnecessary + * performance optimisation that risked atomicity. + */ + Set<Map.Entry<ServiceID,AnnouncementInfo>> eSet = regInfo.entrySet(); + for(Iterator<Map.Entry<ServiceID,AnnouncementInfo>> itr = eSet.iterator(); itr.hasNext(); ) { + Map.Entry<ServiceID,AnnouncementInfo> pair = itr.next(); + ServiceID srvcID = pair.getKey(); + long tStamp = pair.getValue().gettStamp(); + long deltaT = curTime - tStamp; + if(deltaT > timeThreshold) { + /* announcements stopped, queue reachability + * test and potential discarded event + */ + UnicastResponse resp = registrars.get(srvcID); + Object req = new CheckReachabilityMarker(resp); + if(pendingDiscoveries.add(req)) { + restoreContextAddTask( + new UnicastDiscoveryTask(req)); + }//endif + }//end if + }//end loop (itr) }//end loop (!isInterrupted) } catch (InterruptedException e) { } }//end run @@ -960,30 +959,49 @@ abstract class AbstractLookupDiscovery i */ Object pending = null; ServiceID srvcID = ann.getServiceID(); - synchronized (registrars) { - UnicastResponse resp = registrars.get(srvcID); - if (resp != null) { - // already in discovered set, timestamp announcement - AnnouncementInfo aInfo = regInfo.get(srvcID); - aInfo = new AnnouncementInfo( System.currentTimeMillis(), aInfo.getSeqNum()); - regInfo.put(srvcID, aInfo); - long currNum = ann.getSequenceNumber(); - if ((newSeqNum(currNum, aInfo.getSeqNum())) && - (!groupSetsEqual(resp.getGroups(), ann.getGroups()))) { - /* Check if the groups have changed. In the case of - * split announcement messages, eventually, group difference - * will be seen for the given sequence number. This - * check ignores other differences, such as port numbers, - * but for the purposes of LookupDiscovery, this is not - * important. - */ - pending = new CheckGroupsMarker(ann); - } - } else if (groupsOverlap(ann.getGroups())) { - // newly discovered - pending = new LookupLocator(ann.getHost(), ann.getPort()); - } - } + UnicastResponse resp = registrars.get(srvcID); + if (resp != null) { + // already in discovered set, timestamp announcement + AnnouncementInfo aInfo = regInfo.get(srvcID); + if (aInfo == null){ + // Possible interleaved putIfAbsent + aInfo = new AnnouncementInfo( System.currentTimeMillis(), -1); + AnnouncementInfo existed = regInfo.putIfAbsent(srvcID, aInfo); + if (existed != null) { + // The time will be almost identical so reuse existing. + aInfo = existed; + } + } else { + AnnouncementInfo newAInfo = new AnnouncementInfo( System.currentTimeMillis(), aInfo.getSeqNum()); + if (regInfo.replace(srvcID, aInfo, newAInfo)){ + aInfo = newAInfo; + } else { + // May have been removed or replaced already + AnnouncementInfo existed = regInfo.putIfAbsent(srvcID, newAInfo); + if (existed != null){ + aInfo = existed; + } else { + // Was absent + aInfo = newAInfo; + } + } + } + long currNum = ann.getSequenceNumber(); + if ((newSeqNum(currNum, aInfo.getSeqNum())) && + (!groupSetsEqual(resp.getGroups(), ann.getGroups()))) { + /* Check if the groups have changed. In the case of + * split announcement messages, eventually, group difference + * will be seen for the given sequence number. This + * check ignores other differences, such as port numbers, + * but for the purposes of LookupDiscovery, this is not + * important. + */ + pending = new CheckGroupsMarker(ann); + } + } else if (groupsOverlap(ann.getGroups())) { + // newly discovered + pending = new LookupLocator(ann.getHost(), ann.getPort()); + } if (pending != null) { try { checkAnnouncementConstraints(ann); @@ -995,20 +1013,24 @@ abstract class AbstractLookupDiscovery i return; } if (pending instanceof CheckGroupsMarker) { - synchronized(registrars) { - // Since this is a valid announcement, update the - // sequence number. - AnnouncementInfo aInfo = regInfo.get(srvcID); - aInfo = new AnnouncementInfo( aInfo.gettStamp(), ann.getSequenceNumber()); - regInfo.put(srvcID, aInfo); - } + // Since this is a valid announcement, update the + // sequence number. + AnnouncementInfo aInfo = regInfo.get(srvcID); + if (!regInfo.replace + ( srvcID, + aInfo, + new AnnouncementInfo( + aInfo.gettStamp(), + ann.getSequenceNumber() + ) + ) + ) + { + logger.fine("aInfo changed and was not replaced"); + } } - boolean added; // enqueue and handle pending action, if not already enqueued - synchronized (pendingDiscoveries) { - added = pendingDiscoveries.add(pending); - } - if (added) { + if (pendingDiscoveries.add(pending)) { if (unicastDelayRange <= 0) { new UnicastDiscoveryTask(pending).run(); } else { @@ -1018,7 +1040,7 @@ abstract class AbstractLookupDiscovery i synchronized (ud) { ud.ticket = t; ud.delayRun = false; - synchronized (pendingDiscoveries) { + synchronized (tickets) { tickets.add(t); } ud.notifyAll(); @@ -1134,7 +1156,7 @@ abstract class AbstractLookupDiscovery i while (delayRun) { this.wait(); } - synchronized (pendingDiscoveries) { + synchronized (tickets) { // If this was run by a WakeupManager, remove its // ticket from the list of outstanding tickets. if (ticket != null) { @@ -1192,10 +1214,7 @@ abstract class AbstractLookupDiscovery i // handle group changes announcement = ((CheckGroupsMarker)req).announcement; ServiceID srvcID = announcement.getServiceID(); - UnicastResponse resp = null; - synchronized (registrars) { - resp = registrars.get(srvcID); - } + UnicastResponse resp = registrars.get(srvcID); if(resp != null) { maybeSendEvent(resp, announcement.getGroups()); }//endif @@ -1251,9 +1270,7 @@ abstract class AbstractLookupDiscovery i } finally { // Done with the request. Remove it regardless of // if we succeeded or failed. - synchronized (pendingDiscoveries) { - pendingDiscoveries.remove(req); - } + pendingDiscoveries.remove(req); }//end try/catch logger.finest("LookupDiscovery - UnicastDiscoveryTask completed"); }//end run @@ -1404,9 +1421,14 @@ abstract class AbstractLookupDiscovery i /* ExecutorService */ ExecutorService executorServ; try { - executorServ = (ExecutorService) config.getEntry(COMPONENT_NAME, "executorService", ExecutorService.class); + executorServ = (ExecutorService) config.getEntry(COMPONENT_NAME, + "executorService", ExecutorService.class); } catch (NoSuchEntryException e) { /* use default */ - executorServ = Executors.newFixedThreadPool(MAX_N_TASKS); + executorServ = + new ThreadPoolExecutor(1, MAX_N_TASKS , + 15L, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("LookupDiscovery", false)); } this.executor = executorServ; @@ -1533,14 +1555,13 @@ abstract class AbstractLookupDiscovery i private AbstractLookupDiscovery(String[] groups, Initializer init) { - if (groups != null) { - this.groups = new HashSet(groups.length * 2); + this.groups = new ConcurrentSkipListSet(); + if (groups != null) { for (int i = 0; i < groups.length; i++) { this.groups.add(groups[i]); }//end loop all_groups = false; } else { - this.groups = new HashSet(); all_groups = true; } @@ -1606,7 +1627,7 @@ abstract class AbstractLookupDiscovery i }//end constructor - /* sync on registars */ + /* sync on this */ private boolean started = false; /** @@ -1619,7 +1640,7 @@ abstract class AbstractLookupDiscovery i * @since 2.2.1 */ void start() throws IOException { - synchronized (registrars){ + synchronized (this){ if (started) return; if (thrown != null) throw (IOException) thrown; if (!all_groups || !this.groups.isEmpty()) { @@ -1655,23 +1676,21 @@ abstract class AbstractLookupDiscovery i if(l == null) { throw new NullPointerException("can't add null listener"); } - synchronized (registrars) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - if (listeners.indexOf(l) >= 0) return; //already have this listener - listeners.add(l); - if (registrars.isEmpty()) return;//nothing to send the new listener - HashMap groupsMap = new HashMap(registrars.size()); - Iterator iter = registrars.values().iterator(); - while (iter.hasNext()) { - UnicastResponse resp = (UnicastResponse)iter.next(); - groupsMap.put(resp.getRegistrar(),resp.getGroups()); - } - ArrayList list = new ArrayList(1); - list.add(l); - addNotify(list, groupsMap, DISCOVERED); - } + if (terminated) { + throw new IllegalStateException("discovery terminated"); + } + if (listeners.indexOf(l) >= 0) return; //already have this listener + listeners.add(l); + if (registrars.isEmpty()) return;//nothing to send the new listener + HashMap groupsMap = new HashMap(registrars.size()); + Iterator iter = registrars.values().iterator(); + while (iter.hasNext()) { + UnicastResponse resp = (UnicastResponse)iter.next(); + groupsMap.put(resp.getRegistrar(),resp.getGroups()); + } + List list = new ArrayList(1); + list.add(l); + addNotify(list, groupsMap, DISCOVERED); }//end addDiscoveryListener /** @@ -1687,12 +1706,10 @@ abstract class AbstractLookupDiscovery i * @see #addDiscoveryListener */ public void removeDiscoveryListener(DiscoveryListener l) { - synchronized (registrars) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - listeners.remove(l); - } + if (terminated) { + throw new IllegalStateException("discovery terminated"); + } + listeners.remove(l); }//end removeDiscoveryListener /** @@ -1712,20 +1729,18 @@ abstract class AbstractLookupDiscovery i * @see net.jini.discovery.DiscoveryManagement#removeDiscoveryListener */ public ServiceRegistrar[] getRegistrars() { - synchronized (registrars) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - if (registrars.isEmpty()) { - return new ServiceRegistrar[0]; - } - Iterator iter = registrars.values().iterator(); - ServiceRegistrar[] regs = new ServiceRegistrar[registrars.size()]; - for (int i=0;iter.hasNext();i++) { - regs[i] = ((UnicastResponse)iter.next()).getRegistrar(); - } - return regs; + if (terminated) { + throw new IllegalStateException("discovery terminated"); + } + if (registrars.isEmpty()) { + return new ServiceRegistrar[0]; } + Iterator iter = registrars.values().iterator(); + ServiceRegistrar[] regs = new ServiceRegistrar[registrars.size()]; + for (int i=0;iter.hasNext();i++) { + regs[i] = ((UnicastResponse)iter.next()).getRegistrar(); + } + return regs; }//end getRegistrars /** @@ -1747,21 +1762,17 @@ abstract class AbstractLookupDiscovery i * @see DiscoveryListener#discarded */ public void discard(ServiceRegistrar reg) { - synchronized (registrars) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - if(reg == null) return; - sendDiscarded(reg,null); - }//end sync + if (terminated) { + throw new IllegalStateException("discovery terminated"); + } + if(reg == null) return; + sendDiscarded(reg,null); }//end discard /** Terminate the discovery process. */ public void terminate() { - synchronized (registrars) { - if (terminated) return; - terminated = true; - } + if (terminated) return; + terminated = true; nukeThreads(); }//end terminate @@ -1783,16 +1794,14 @@ abstract class AbstractLookupDiscovery i * @see #setGroups */ public String[] getGroups() { - synchronized (registrars) { - if (terminated) { - throw new IllegalStateException("discovery terminated"); - } - if (all_groups) - return ALL_GROUPS; - if (groups.isEmpty()) - return NO_GROUPS; - return collectionToStrings(groups); - } + if (terminated) { + throw new IllegalStateException("discovery terminated"); + } + if (all_groups) + return ALL_GROUPS; + if (groups.isEmpty()) + return NO_GROUPS; + return collectionToStrings(groups); }//end getGroups /** @@ -1816,20 +1825,18 @@ abstract class AbstractLookupDiscovery i public void addGroups(String[] newGroups) throws IOException { testArrayForNullElement(newGroups); checkGroups(newGroups); - synchronized (registrars) { - if (terminated) - throw new IllegalStateException("discovery terminated"); - if (all_groups) - throw new UnsupportedOperationException( - "can't add to \"any groups\""); - Collection req = new ArrayList(newGroups.length); - for (int i = 0; i < newGroups.length; i++) { - if (groups.add(newGroups[i])) - req.add(newGroups[i]); - } - if (!req.isEmpty()) - requestGroups(req); - } + if (terminated) + throw new IllegalStateException("discovery terminated"); + if (all_groups) + throw new UnsupportedOperationException( + "can't add to \"any groups\""); + Collection req = new ArrayList(newGroups.length); + for (int i = 0; i < newGroups.length; i++) { + if (groups.add(newGroups[i])) + req.add(newGroups[i]); + } + if (!req.isEmpty()) + requestGroups(req); }//end addGroups /** @@ -1870,40 +1877,38 @@ abstract class AbstractLookupDiscovery i newGrps.add(newGroups[i]); } } - synchronized (registrars) { - if (terminated) - throw new IllegalStateException("discovery terminated"); - if (newGroups == null) { - if (!all_groups) { - all_groups = true; - groups.clear(); - requestGroups(null); - } - return; - } - if (all_groups == true) { - all_groups = false; + if (terminated) + throw new IllegalStateException("discovery terminated"); + if (newGroups == null) { + if (!all_groups) { + all_groups = true; groups.clear(); - maybeDiscard = true; - } - Set toAdd = new HashSet(newGrps); - toAdd.removeAll(groups); - // Figure out which groups to get rid of. We start off - // with the full set for which we are already listening, - // and eliminate any that are in both the new set and the - // current set. - Collection toRemove = new HashSet(groups); - toRemove.removeAll(newGrps); - // Add new groups before we remove any old groups, because - // removeGroups will start a new round of multicast requests - // if the set of groups becomes empty, and we don't want it - // to do so without reason. - groups.addAll(toAdd); - if (!toRemove.isEmpty()) - maybeDiscard |= removeGroupsInt(collectionToStrings(toRemove)); - if (!toAdd.isEmpty()) - requestGroups(toAdd); - } + requestGroups(null); + } + return; + } + if (all_groups == true) { + all_groups = false; + groups.clear(); + maybeDiscard = true; + } + Set toAdd = new HashSet(newGrps); + toAdd.removeAll(groups); + // Figure out which groups to get rid of. We start off + // with the full set for which we are already listening, + // and eliminate any that are in both the new set and the + // current set. + Collection toRemove = new HashSet(groups); + toRemove.removeAll(newGrps); + // Add new groups before we remove any old groups, because + // removeGroups will start a new round of multicast requests + // if the set of groups becomes empty, and we don't want it + // to do so without reason. + groups.addAll(toAdd); + if (!toRemove.isEmpty()) + maybeDiscard |= removeGroupsInt(collectionToStrings(toRemove)); + if (!toAdd.isEmpty()) + requestGroups(toAdd); if (maybeDiscard) maybeDiscardRegistrars(); }//end setGroups @@ -1923,14 +1928,12 @@ abstract class AbstractLookupDiscovery i public void removeGroups(String[] oldGroups) { testArrayForNullElement(oldGroups); boolean maybeDiscard; - synchronized (registrars) { - if (terminated) - throw new IllegalStateException("discovery terminated"); - if (all_groups) - throw new UnsupportedOperationException( - "can't remove from \"any groups\""); - maybeDiscard = removeGroupsInt(oldGroups); - } + if (terminated) + throw new IllegalStateException("discovery terminated"); + if (all_groups) + throw new UnsupportedOperationException( + "can't remove from \"any groups\""); + maybeDiscard = removeGroupsInt(oldGroups); if (maybeDiscard) maybeDiscardRegistrars(); }//end removeGroups @@ -2164,10 +2167,8 @@ abstract class AbstractLookupDiscovery i /** Returns the service IDs of the lookup service(s) discovered to date. */ private ServiceID[] getServiceIDs() { - synchronized (registrars) { - return (ServiceID[]) - registrars.keySet().toArray(new ServiceID[registrars.size()]); - }//end sync + return (ServiceID[]) + registrars.keySet().toArray(new ServiceID[registrars.size()]); }//end getServiceIDs /** @@ -2253,9 +2254,7 @@ abstract class AbstractLookupDiscovery i * don't match any of the groups of interest, then don't waste time * performing an unnecessary proxy preparation; simply return. */ - synchronized(registrars) { - if( !groupsOverlap(resp.getGroups()) ) return; - }//end sync(registrars) + if( !groupsOverlap(resp.getGroups()) ) return; /* Proxy preparation - * @@ -2318,41 +2317,44 @@ abstract class AbstractLookupDiscovery i /* Add any newly discovered registrars to the managed set and notify * all listeners. */ - synchronized (registrars) { - if(groupsOverlap(resp.getGroups()) && - !registrarsEqual(resp, - (UnicastResponse) registrars.put - (resp.getRegistrar().getServiceID(), resp))) - { - /* Time stamp the service ID and store its current sequence - * number. The first time stamp associated - * with the current service ID occurs here. All other time - * stamps for that service ID will occur when multicast - * announcements for that service ID arrive (in the - * AnnouncementListener thread). - * - * Note that if the time stamp for the service ID were - * initialized upon the arrival of the first announcement, - * rather than here when it is first discovered, the - * AnnouncementTimerThread would not be able to detect the - * termination of announcements for the case where the - * termination happens to occur between the time the lookup - * is first discovered here, and the time the first - * announcement was supposed to have arrived. This can - * happen because a multicast request from the client can - * cause the lookup to be discovered before the first - * announcement arrives. - */ - regInfo.put(resp.getRegistrar().getServiceID(), - new AnnouncementInfo(System.currentTimeMillis(), -1)); - if(!listeners.isEmpty()) { - addNotify((ArrayList)listeners.clone(), - mapRegToGroups(resp.getRegistrar(), - resp.getGroups()), - DISCOVERED); - }//endif - }//endif - }//end sync(registrars) + if(groupsOverlap(resp.getGroups()) && + !registrarsEqual(resp, + (UnicastResponse) registrars.put + (resp.getRegistrar().getServiceID(), resp))) + { + /* Time stamp the service ID and store its current sequence + * number. The first time stamp associated + * with the current service ID occurs here. All other time + * stamps for that service ID will occur when multicast + * announcements for that service ID arrive (in the + * AnnouncementListener thread). + * + * Note that if the time stamp for the service ID were + * initialized upon the arrival of the first announcement, + * rather than here when it is first discovered, the + * AnnouncementTimerThread would not be able to detect the + * termination of announcements for the case where the + * termination happens to occur between the time the lookup + * is first discovered here, and the time the first + * announcement was supposed to have arrived. This can + * happen because a multicast request from the client can + * cause the lookup to be discovered before the first + * announcement arrives. + */ + AnnouncementInfo aInfo = new AnnouncementInfo(System.currentTimeMillis(), -1); + AnnouncementInfo existed = regInfo.putIfAbsent(resp.getRegistrar().getServiceID(), + aInfo); + if (existed != null){ + logger.log(Level.FINE, + "AnnouncementInfo already existed in regInfo map:\n{0}should be:\n{1}", + new Object[]{existed, aInfo}); + } + if(!listeners.isEmpty()) { + addNotify( listeners, + mapRegToGroups(resp.getRegistrar(),resp.getGroups()), + DISCOVERED); + }//endif + }//endif }//end maybeAddNewRegistrar /** Determine if any of the already-discovered registrars are no longer @@ -2361,32 +2363,29 @@ abstract class AbstractLookupDiscovery i */ @SuppressWarnings("unchecked") private void maybeDiscardRegistrars() { - synchronized (registrars) { - Map<ServiceRegistrar,String[]> groupsMap = new HashMap<ServiceRegistrar,String[]>(registrars.size()); - for(Iterator<UnicastResponse> iter=registrars.values().iterator();iter.hasNext(); ){ - UnicastResponse ent = iter.next(); - if(!groupsOverlap(ent.getGroups())) { // not interested anymore - groupsMap.put(ent.getRegistrar(),ent.getGroups()); - regInfo.remove(ent.getRegistrar().getServiceID()); - iter.remove(); // remove (srvcID,response) mapping - }//endif - }//end loop - if( !groupsMap.isEmpty() && !listeners.isEmpty() ) { - addNotify((ArrayList<DiscoveryListener>)listeners.clone(), groupsMap, DISCARDED); - }//endif - }//end sync + Map<ServiceRegistrar,String[]> groupsMap = new HashMap<ServiceRegistrar,String[]>(registrars.size()); + for(Iterator<UnicastResponse> iter=registrars.values().iterator();iter.hasNext(); ){ + UnicastResponse ent = iter.next(); + if(!groupsOverlap(ent.getGroups())) { // not interested anymore + groupsMap.put(ent.getRegistrar(),ent.getGroups()); + regInfo.remove(ent.getRegistrar().getServiceID()); + iter.remove(); // remove (srvcID,response) mapping + }//endif + }//end loop + if( !groupsMap.isEmpty() && !listeners.isEmpty() ) { + addNotify(listeners, groupsMap, DISCARDED); + }//endif }//end maybeDiscardRegistrars /** * Add a notification task to the pending queue, and wake up the Notifier. */ - private void addNotify(List<DiscoveryListener> notifies, Map<ServiceRegistrar,String[]> groupsMap, int eventType) { - synchronized (pendingNotifies) { - pendingNotifies.addLast(new NotifyTask(notifies, - groupsMap, - eventType)); - pendingNotifies.notify(); - }//end sync + private void addNotify(List<DiscoveryListener> notifies, + Map<ServiceRegistrar,String[]> groupsMap, int eventType) + { + pendingNotifies.addLast(new NotifyTask(notifies, + groupsMap, + eventType)); }//end addNotify /** Terminates (interrupts) all currently-running threads. */ @@ -2409,7 +2408,7 @@ abstract class AbstractLookupDiscovery i if(announceeThread != null) { announceeThread.interrupt(); }//endif - synchronized (pendingDiscoveries) { + synchronized (tickets) { terminateTaskMgr(); Iterator i = tickets.iterator(); while (i.hasNext()) { @@ -2440,23 +2439,19 @@ abstract class AbstractLookupDiscovery i } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } - /* Clear pendingDiscoveries and close all associated sockets */ - synchronized (pendingDiscoveries) { - for(Iterator iter = pendingDiscoveries.iterator(); - iter.hasNext();) - { - Object req = iter.next(); - iter.remove(); - if (req instanceof Socket) { - try { - ((Socket)req).close(); - } catch (IOException e) { /* ignore */ } - }//endif - }//end loop - }//end sync - synchronized(pendingNotifies) { + /* Clear pendingDiscoveries and close all associated sockets */ + for(Iterator iter = pendingDiscoveries.iterator(); + iter.hasNext();) + { + Object req = iter.next(); + iter.remove(); + if (req instanceof Socket) { + try { + ((Socket)req).close(); + } catch (IOException e) { /* ignore */ } + }//endif + }//end loop pendingNotifies.clear(); - }//end sync }//end terminateTaskMgr /** After a possible change in the member groups of the @@ -2509,17 +2504,15 @@ abstract class AbstractLookupDiscovery i if(actualGroups == null) return; // null ==> was already discarded }//endif - synchronized (registrars) { - // Other events may have occured to registrars while we were - // making our remote call. - UnicastResponse resp = - registrars.get(reg.getServiceID()); - if (resp == null) { - // The registrar was discarded in the meantime. Oh well. - return; - } - notifyOnGroupChange(reg, resp.getGroups(), actualGroups); - } + // Other events may have occured to registrars while we were + // making our remote call. + UnicastResponse resp = + registrars.get(reg.getServiceID()); + if (resp == null) { + // The registrar was discarded in the meantime. Oh well. + return; + } + notifyOnGroupChange(reg, resp.getGroups(), actualGroups); }//end maybeSendEvent /** After a possible change in the member groups of the given @@ -2589,8 +2582,7 @@ abstract class AbstractLookupDiscovery i if( registrars.remove(srvcID) != null ) { regInfo.remove(srvcID); if( !listeners.isEmpty() ) { - addNotify((ArrayList)listeners.clone(), - mapRegToGroups(reg,curGroups), DISCARDED); + addNotify( listeners, mapRegToGroups(reg,curGroups), DISCARDED); }//endif }//endif }//end sendDiscarded @@ -2616,8 +2608,7 @@ abstract class AbstractLookupDiscovery i curGroups, resp.getRegistrar())); if( !listeners.isEmpty() ) { - addNotify((ArrayList)listeners.clone(), - mapRegToGroups(reg,curGroups), CHANGED); + addNotify(listeners, mapRegToGroups(reg,curGroups), CHANGED); }//endif }//end sendChanged @@ -2942,5 +2933,16 @@ abstract class AbstractLookupDiscovery i long getSeqNum() { return seqNum; } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Timestamp: "); + sb.append(tStamp); + sb.append('\n'); + sb.append("Sequence Number: "); + sb.append(seqNum); + sb.append('\n'); + return sb.toString(); + } } }
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscoveryManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscoveryManager.java?rev=1554723&r1=1554722&r2=1554723&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscoveryManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscoveryManager.java Thu Jan 2 02:45:07 2014 @@ -21,11 +21,18 @@ package net.jini.discovery; import com.sun.jini.logging.Levels; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; import net.jini.config.Configuration; import net.jini.config.ConfigurationException; @@ -72,14 +79,23 @@ abstract class AbstractLookupDiscoveryMa * Note that this set is shared across threads; therefore, when * accessing or modifying the contents of this set, the appropriate * synchronization must be applied. + * + * A list is used instead of a Set, because hashCode and equals are + * remote calls. However we need to avoid duplicates and since this + * requires list traversal using remote method calls during add, + * we need a lock to prevent concurrent additions. Removal while an + * add operation is in progress would be acceptable, since the worst + * case scenario is the iterator snapshot would contain the removed + * ProxyReg. */ - private final ArrayList discoveredSet = new ArrayList(1); // sync(discoveredSet) + private final List<ProxyReg> discoveredSet = new CopyOnWriteArrayList<ProxyReg>(); // sync(discoveredSet) + private final Lock discoveredSetAddLock = new ReentrantLock(); /** Contains the instances of <code>DiscoveryListener</code> that clients * register with the <code>LookupDiscoveryManager</code>. The elements * of this set receive discovered events, discarded events and, when * appropriate, changed events. */ - private final ArrayList listeners = new ArrayList(1); // sync(listeners) + private final Set<DiscoveryListener> listeners = Collections.newSetFromMap(new ConcurrentHashMap<DiscoveryListener,Boolean>()); /** The <code>LookupDiscovery</code> utility used to manage the group * discovery mechanism. Note that this object cannot be accessed outside * of this <code>LookupDiscoveryManager</code>. @@ -155,7 +171,7 @@ abstract class AbstractLookupDiscoveryMa * value of <code>true</code> in this flag is equivalent to a * communication discard. */ - public boolean commDiscard = false; // sync(discoveredSet) + public boolean commDiscard = false; // sync(this) /** Integer restricted to the values 0, 1, 2, and 3. Each value * represents a bit (or set of bits) that, when set, indicates the * mechanism (group discovery, locator discovery, or both) through @@ -381,22 +397,26 @@ abstract class AbstractLookupDiscoveryMa public void discovered(DiscoveryEvent e) { ServiceRegistrar[] proxys = (ServiceRegistrar[])e.getRegistrars(); Map groupsMap = e.getGroups(); - HashMap discoveredGroupsMap = new HashMap(proxys.length); - for(int i=0; i<proxys.length; i++) { - synchronized(discoveredSet) { - ProxyReg reg = findReg(proxys[i]); - if(reg == null) {//newly discovered, send event - reg = new ProxyReg(proxys[i], - (String[])(groupsMap.get(proxys[i])), - FROM_LOCATOR); - addDiscoveredSet(reg); - discoveredGroupsMap.put(proxys[i], - groupsMap.get(proxys[i])); - } else {//previously discovered, update bit, send no event - reg.addFrom(FROM_LOCATOR); - }//endif - }//end sync(discoveredSet) - }//end loop + int len = proxys.length; + HashMap discoveredGroupsMap = new HashMap(len); + discoveredSetAddLock.lock(); + try { + for(int i=0; i<len; i++) { + ProxyReg reg = findReg(proxys[i]); + if(reg == null) {//newly discovered, send event + reg = new ProxyReg(proxys[i], + (String[])(groupsMap.get(proxys[i])), + FROM_LOCATOR); + discoveredSet.add(reg); + discoveredGroupsMap.put(proxys[i], + groupsMap.get(proxys[i])); + } else {//previously discovered, update bit, send no event + reg.addFrom(FROM_LOCATOR); + }//endif + }//end loop + } finally { + discoveredSetAddLock.unlock(); + } /* Will send notification only if map is non-empty from above */ notifyListener(discoveredGroupsMap, DISCOVERED); }//end discovered @@ -483,9 +503,9 @@ abstract class AbstractLookupDiscoveryMa public void discarded(DiscoveryEvent e) { ServiceRegistrar[] proxys = (ServiceRegistrar[])e.getRegistrars(); Map groupsMap = e.getGroups(); - HashMap discardedGroupsMap = new HashMap(proxys.length); - for(int i=0; i<proxys.length; i++) { - synchronized(discoveredSet) { + int len = proxys.length; + HashMap discardedGroupsMap = new HashMap(len); + for(int i=0; i<len; i++) { ProxyReg reg = findReg(proxys[i]); if(reg != null) { String[] newGroups = (String[])groupsMap.get @@ -494,26 +514,18 @@ abstract class AbstractLookupDiscoveryMa /* Locator discovery only, always send discarded */ discardedGroupsMap.put(reg.proxy,newGroups); } else {//group and loc discovery - if( reg.commDiscard ) {//unreachable, send later - reg.discard();//discard from LookupDiscovery - }//endif + synchronized (reg){ + if( reg.commDiscard ) {//unreachable, send later + reg.discard();//discard from LookupDiscovery + }//endif + } }//endif }//endif(reg != null) - }//end sync(discoveredSet) }//end loop /* Will send notification only if map is non-empty from above */ notifyListener(discardedGroupsMap, DISCARDED); }//end discarded - /* Convenience method that adds the given <code>ProxyReg</code> - * instance to the managed set of discovered registrars. - */ - void addDiscoveredSet(ProxyReg reg) { - synchronized(discoveredSet) { - discoveredSet.add(reg); - } - }//end addDiscoveredSet - /* Convenience method that first attempts to unset the bit in the * discovery mechanism flag of the given ProxyReg based on the value * of the given discovery mechanism parameter (from). Depending on @@ -529,11 +541,7 @@ abstract class AbstractLookupDiscoveryMa */ boolean removeDiscoveredSet(ProxyReg reg, int from) { boolean bret = reg.removeFrom(from); - if(bret) { - synchronized(discoveredSet) { - discoveredSet.remove(reg); - }//end sync - }//endif + if (bret) discoveredSet.remove(reg); return bret; }//end removeDiscoveredSet }//end class LocatorDiscoveryListener @@ -568,16 +576,18 @@ abstract class AbstractLookupDiscoveryMa public void discovered(DiscoveryEvent e) { ServiceRegistrar[] proxys = (ServiceRegistrar[])e.getRegistrars(); Map groupsMap = e.getGroups(); - HashMap discoveredGroupsMap = new HashMap(proxys.length); - HashMap changedGroupsMap = new HashMap(proxys.length); - for(int i=0; i<proxys.length; i++) { - synchronized(discoveredSet) { + int len = proxys.length; + HashMap discoveredGroupsMap = new HashMap(len); + HashMap changedGroupsMap = new HashMap(len); + discoveredSetAddLock.lock(); + try { + for(int i=0; i<len; i++) { ProxyReg reg = findReg(proxys[i]); if(reg == null) {//newly discovered, send discovered event reg = new ProxyReg(proxys[i], (String[])(groupsMap.get(proxys[i])), FROM_GROUP); - addDiscoveredSet(reg); + discoveredSet.add(reg); discoveredGroupsMap.put(proxys[i], groupsMap.get(proxys[i])); } else {//previously discovered by group, by loc or by both @@ -591,8 +601,10 @@ abstract class AbstractLookupDiscoveryMa changedGroupsMap.put(reg.proxy,newGroups); }//endif }//endif - }//end sync(discoveredSet) - }//end loop + }//end loop + } finally { + discoveredSetAddLock.unlock(); + } /* Will send notification only if map is non-empty from above */ notifyListener(discoveredGroupsMap, DISCOVERED); notifyListener(changedGroupsMap, CHANGED); @@ -723,10 +735,10 @@ abstract class AbstractLookupDiscoveryMa public void discarded(DiscoveryEvent e) { ServiceRegistrar[] proxys = (ServiceRegistrar[])e.getRegistrars(); Map groupsMap = e.getGroups(); - HashMap discardedGroupsMap = new HashMap(proxys.length); - HashMap changedGroupsMap = new HashMap(proxys.length); - for(int i=0; i<proxys.length; i++) { - synchronized(discoveredSet) { + int len = proxys.length; + HashMap discardedGroupsMap = new HashMap(len); + HashMap changedGroupsMap = new HashMap(len); + for(int i=0; i<len; i++) { ProxyReg reg = findReg(proxys[i]); if(reg != null) { String[] newGroups = (String[])groupsMap.get @@ -747,7 +759,6 @@ abstract class AbstractLookupDiscoveryMa }//endif(stillInterested or not) }//endif(removeDiscoveredSet ==> group or group & loc) }//endif(reg != null) - }//end sync(discoveredSet) }//end loop /* Will send notification only if map is non-empty from above */ notifyListener(discardedGroupsMap, DISCARDED); @@ -766,17 +777,16 @@ abstract class AbstractLookupDiscoveryMa /* update the groups of each changed registrar */ ServiceRegistrar[] proxys = (ServiceRegistrar[])e.getRegistrars(); Map groupsMap = e.getGroups(); - HashMap changedGroupsMap = new HashMap(proxys.length); - for(int i=0; i<proxys.length; i++) { - synchronized(discoveredSet) { - ProxyReg reg = findReg(proxys[i]); - if(reg != null) {//previously discovered - String[] newGroups = (String[])groupsMap.get - (reg.proxy); - reg.setMemberGroups(newGroups); - changedGroupsMap.put(reg.proxy,newGroups); - }//endif - }//end sync(discoveredSet) + int len = proxys.length; + HashMap changedGroupsMap = new HashMap(len); + for(int i=0; i<len; i++) { + ProxyReg reg = findReg(proxys[i]); + if(reg != null) {//previously discovered + String[] newGroups = (String[])groupsMap.get + (reg.proxy); + reg.setMemberGroups(newGroups); + changedGroupsMap.put(reg.proxy,newGroups); + }//endif }//end loop /* Will send notification only if map is non-empty from above */ notifyListener(changedGroupsMap, CHANGED); @@ -1081,23 +1091,15 @@ abstract class AbstractLookupDiscoveryMa if(listener == null) { throw new NullPointerException("can't add null listener"); } - // Unsynchronized access to discoveredSet, was this to avoid deadlock? - // I've cloned it, it wasn't likely to get an up to date copy without sync anyway. - ArrayList discoveredSetClone; - synchronized (discoveredSet){ - discoveredSetClone = new ArrayList(discoveredSet); - } - HashMap groupsMap = new HashMap(discoveredSetClone.size()); - synchronized(listeners) { - if(!listeners.contains(listener)) { - listeners.add(listener); - } - if(discoveredSetClone.isEmpty()) return; - for(int i=0; i< discoveredSetClone.size(); i++) { - ProxyReg reg = (ProxyReg)discoveredSetClone.get(i); - groupsMap.put(reg.proxy,reg.getMemberGroups()); - } - } + + listeners.add(listener); + if (discoveredSet.isEmpty()) return; + HashMap groupsMap = new HashMap(discoveredSet.size()); + Iterator<ProxyReg> it = discoveredSet.iterator(); + while (it.hasNext()){ + ProxyReg reg = it.next(); + groupsMap.put(reg.proxy,reg.getMemberGroups()); + } notifyListener(listener, groupsMap, DISCOVERED); } @@ -1115,9 +1117,7 @@ abstract class AbstractLookupDiscoveryMa * @see #addDiscoveryListener */ public void removeDiscoveryListener(DiscoveryListener listener) { - synchronized(listeners) { - listeners.remove(listener); - } + listeners.remove(listener); } /** @@ -1133,19 +1133,14 @@ abstract class AbstractLookupDiscoveryMa * @see net.jini.discovery.DiscoveryManagement#removeDiscoveryListener */ public ServiceRegistrar[] getRegistrars() { - ArrayList proxySet = new ArrayList(1);; - synchronized(discoveredSet) { - int k = 0; - Iterator iter = discoveredSet.iterator(); - while(iter.hasNext()) { - ProxyReg reg = (ProxyReg)iter.next(); - if(!reg.isDiscarded()) - proxySet.add(reg.proxy); - } - } - ServiceRegistrar[] ret = new ServiceRegistrar[proxySet.size()]; - proxySet.toArray(ret); - return ret; + List<ServiceRegistrar> proxySet = new LinkedList<ServiceRegistrar>(); + Iterator<ProxyReg> iter = discoveredSet.iterator(); + while(iter.hasNext()) { + ProxyReg reg = iter.next(); + if(!reg.isDiscarded()) + proxySet.add(reg.proxy); + } + return proxySet.toArray(new ServiceRegistrar[proxySet.size()]); } /** @@ -1166,10 +1161,10 @@ abstract class AbstractLookupDiscoveryMa if(proxy == null) return; ProxyReg reg = findReg(proxy); if(reg != null) { - synchronized(discoveredSet) { - reg.discard(); + synchronized (reg){ + reg.discard(); reg.commDiscard = true; - }//end sync(discoveredSet) + } }//endif }//end discard @@ -1184,9 +1179,7 @@ abstract class AbstractLookupDiscoveryMa * @see net.jini.discovery.DiscoveryManagement#terminate */ public void terminate() { - synchronized(listeners) { - listeners.clear(); - } + listeners.clear(); lookupDisc.terminate(); locatorDisc.terminate(); } @@ -1218,14 +1211,12 @@ abstract class AbstractLookupDiscoveryMa private ProxyReg findReg(ServiceRegistrar proxy) { - synchronized(discoveredSet) { - Iterator iter = discoveredSet.iterator(); - while(iter.hasNext()) { - ProxyReg reg =(ProxyReg)iter.next(); - if(reg.proxy.equals(proxy)) - return reg; - } - } + Iterator iter = discoveredSet.iterator(); + while(iter.hasNext()) { + ProxyReg reg =(ProxyReg)iter.next(); + if(reg.proxy.equals(proxy)) + return reg; + } return null; } @@ -1240,13 +1231,9 @@ abstract class AbstractLookupDiscoveryMa */ private void notifyListener(Map groupsMap, int eventType) { if(groupsMap.isEmpty()) return; - ArrayList notifies; - synchronized(listeners) { - notifies = (ArrayList)listeners.clone(); - } - Iterator iter = notifies.iterator(); + Iterator<DiscoveryListener> iter = listeners.iterator(); while(iter.hasNext()) { - DiscoveryListener l = (DiscoveryListener)iter.next(); + DiscoveryListener l = iter.next(); try { notifyListener(l, groupsMap, eventType); } catch (Throwable t) {
