Author: peter_firmstone Date: Tue Dec 10 11:20:57 2013 New Revision: 1549810
URL: http://svn.apache.org/r1549810 Log: Attempting to fix test failures on Windows Server 2008 Jdk1.7.0_25, using Jenkins as test environment, refactoring of RegistrarImpl Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java?rev=1549810&r1=1549809&r2=1549810&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java Tue Dec 10 11:20:57 2013 @@ -86,7 +86,7 @@ public class MultipleEvntLeaseRenewals e } /** The event handler for the services registered by this class */ - private static BasicListener listener; + private BasicListener listener; /** Performs actions necessary to prepare for execution of the * current QA test. Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java?rev=1549810&r1=1549809&r2=1549810&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Tue Dec 10 11:20:57 2013 @@ -38,6 +38,7 @@ import com.sun.jini.reliableLog.LogHandl import com.sun.jini.reliableLog.ReliableLog; import com.sun.jini.start.LifeCycle; import com.sun.jini.thread.InterruptedStatusThread; +import com.sun.jini.thread.InterruptedStatusThread.Interruptable; import com.sun.jini.thread.ReadersWriter; import com.sun.jini.thread.ReadersWriter.ConcurrentLockException; //import com.sun.jini.thread.ReadyState; @@ -88,7 +89,9 @@ import java.util.NoSuchElementException; import java.util.Random; import java.util.Set; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executor; @@ -96,6 +99,8 @@ import java.util.concurrent.LinkedBlocki import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import javax.net.ServerSocketFactory; @@ -221,7 +226,7 @@ class RegistrarImpl implements Registrar * Identity map from SvcReg to SvcReg, ordered by lease expiration. * Every service is in this map. */ - private final SortedMap<SvcReg,SvcReg> serviceByTime = new TreeMap<SvcReg,SvcReg>(); + private final SortedSet<SvcReg> serviceByTime = new TreeSet<SvcReg>(); /** * Map from String to HashMap mapping ServiceID to SvcReg. Every service * is in this map under its types. @@ -316,7 +321,8 @@ class RegistrarImpl implements Registrar /** Event lease expiration thread */ private final Thread eventExpirer; /** Unicast discovery request packet receiving thread */ - private volatile UnicastThread unicaster; + private volatile Thread unicaster; + private volatile Unicast unicast; /** Multicast discovery request packet receiving thread */ private final Thread multicaster; /** Multicast discovery announcement sending thread */ @@ -341,7 +347,7 @@ class RegistrarImpl implements Registrar /** Flag indicating whether system is in a state of recovery */ private volatile boolean inRecovery; /** Current number of records in the Log File since the last snapshot */ - private int logFileSize = 0; + private final AtomicInteger logFileSize = new AtomicInteger(); /** Log file must contain this many records before snapshot allowed */ private final int persistenceSnapshotThreshold ; @@ -366,7 +372,7 @@ class RegistrarImpl implements Registrar /** Interval to wait in between sending multicast announcements */ private final long multicastAnnouncementInterval; /** Multicast announcement sequence number */ - private volatile long announcementSeqNo = 0L; + private final AtomicLong announcementSeqNo = new AtomicLong(); /** Network interfaces to use for multicast discovery */ private final NetworkInterface[] multicastInterfaces; @@ -536,7 +542,6 @@ class RegistrarImpl implements Registrar TimeUnit.MINUTES, new LinkedBlockingQueue() ); -// exec.allowCoreThreadTimeOut(true); eventNotifierExec = exec; // Set up Executor to perform discovery responses, this will naturally // shutdown when Reggie terminates. @@ -547,13 +552,12 @@ class RegistrarImpl implements Registrar TimeUnit.MINUTES, new LinkedBlockingQueue() ); -// exec.allowCoreThreadTimeOut(true); discoveryResponseExec = exec; ReliableLog log = null; Thread serviceExpirer = null; Thread eventExpirer = null; - UnicastThread unicaster = null; + Thread unicaster = null; Thread multicaster = null; Thread announcer = null; Thread snapshotter = null; @@ -564,37 +568,46 @@ class RegistrarImpl implements Registrar @Override public List<Thread> run() throws Exception { + Thread t; List<Thread> list = new ArrayList<Thread>(6); - list.add(new ServiceExpireThread()); - list.add(new EventExpireThread()); - list.add(new UnicastThread(unicastPort)); - list.add(new MulticastThread()); - list.add(new AnnounceThread()); - list.add(new SnapshotThread()); + list.add(newDaemonThread(new ServiceExpire(RegistrarImpl.this), "service expire")); + list.add(newDaemonThread(new EventExpire(RegistrarImpl.this),"event expire")); + unicast = new Unicast(RegistrarImpl.this, unicastPort); + list.add(newInterruptStatusThread(unicast, "unicast request")); + list.add(newInterruptStatusThread(new Multicast(RegistrarImpl.this), "multicast request")); + list.add(newDaemonThread(new Announce(RegistrarImpl.this),"discovery announcement")); + list.add(newDaemonThread(new Snapshot(RegistrarImpl.this),"snapshot thread")); return list; } + + private Thread newDaemonThread(Runnable r, String name){ + Thread t = new Thread(r,name); + t.setDaemon(true); + return t; + } + + private Thread newInterruptStatusThread(Runnable r, String name){ + Thread t = new InterruptedStatusThread(r,name); + t.setDaemon(true); + return t; + } }, context); serviceExpirer = threads.get(0); eventExpirer = threads.get(1); - unicaster = (UnicastThread) threads.get(2); + unicaster = threads.get(2); multicaster = threads.get(3); announcer = threads.get(4); snapshotter = threads.get(5); if (init.persistent){ - log = new ReliableLog(init.persistenceDirectory, new LocalLogHandler()); + log = new ReliableLog(init.persistenceDirectory, new LocalLogHandler(this)); if (logger.isLoggable(Level.CONFIG)) { logger.log(Level.CONFIG, "using persistence directory {0}", new Object[]{ init.persistenceDirectory }); } - log.recover(); } else { log = null; } - // log snapshot recovers myServiceID - if (myServiceID == null) { - myServiceID = newServiceID(); - } constructionException = null; } catch (PrivilegedActionException ex) { @@ -615,7 +628,6 @@ class RegistrarImpl implements Registrar loginContext = init.loginContext; unicastDiscoveryHost = init.unicastDiscoveryHost; config = init.config; - computeMaxLeases(); } /** A service item registration record. */ @@ -640,7 +652,7 @@ class RegistrarImpl implements Registrar * * @serial */ - public long leaseExpiration; + public volatile long leaseExpiration; /** Simple constructor */ public SvcReg(Item item, Uuid leaseID, long leaseExpiration) { @@ -1733,18 +1745,20 @@ class RegistrarImpl implements Registrar * where 'dirname' is the name of the directory path (relative or * absolute) where the snapshot and log file will be maintained. */ - private class LocalLogHandler extends LogHandler { - + private static class LocalLogHandler extends LogHandler { + private final RegistrarImpl reggie; /** Simple constructor */ - public LocalLogHandler() { } + public LocalLogHandler(RegistrarImpl reggie) { + this.reggie = reggie; + } /* Overrides snapshot() defined in ReliableLog's LogHandler class. */ public void snapshot(OutputStream out) throws IOException { - concurrentObj.readLock(); + reggie.concurrentObj.readLock(); try { - takeSnapshot(out); + reggie.takeSnapshot(out); } finally { - concurrentObj.readUnlock(); + reggie.concurrentObj.readUnlock(); } } @@ -1752,11 +1766,11 @@ class RegistrarImpl implements Registrar public void recover(InputStream in) throws IOException, ClassNotFoundException { - concurrentObj.writeLock(); + reggie.concurrentObj.writeLock(); try { - recoverSnapshot(in); + reggie.recoverSnapshot(in); } finally { - concurrentObj.writeUnlock(); + reggie.concurrentObj.writeUnlock(); } } @@ -1783,12 +1797,12 @@ class RegistrarImpl implements Registrar * by the type of record that was retrieved. */ public void applyUpdate(Object logRecObj) { - ((LogRecord)logRecObj).apply(RegistrarImpl.this); + ((LogRecord)logRecObj).apply(reggie); } } /** Base class for iterating over all Items that match a Template. */ - private abstract class ItemIter { + private static abstract class ItemIter { /** Current time */ public final long now = System.currentTimeMillis(); /** True means duplicate items are possible */ @@ -1831,18 +1845,19 @@ class RegistrarImpl implements Registrar } /** Iterate over all Items. */ - private class AllItemIter extends ItemIter { + private static class AllItemIter extends ItemIter { /** Iterator over serviceByID */ - private final Iterator iter; + private final Iterator<SvcReg> iter; /** Assumes the empty template */ - public AllItemIter() { + public AllItemIter(Iterator<SvcReg> it) { super(null); - iter = serviceByID.values().iterator(); + iter = it; step(); } /** Set reg to the next matching element, or null if none */ + @Override protected void step() { while (iter.hasNext()) { reg = (SvcReg)iter.next(); @@ -1854,20 +1869,17 @@ class RegistrarImpl implements Registrar } /** Iterates over all services that match template's service types */ - private class SvcIterator extends ItemIter { + private static class SvcIterator extends ItemIter { /** Iterator for list of matching services. */ - private final Iterator services; + private final Iterator<SvcReg> services; /** * tmpl.serviceID == null and * tmpl.serviceTypes is not empty */ - public SvcIterator(Template tmpl) { + public SvcIterator(Template tmpl, Iterator<SvcReg> it) { super(tmpl); - Map map = (Map) serviceByTypeName.get( - tmpl.serviceTypes[0].getName()); - services = map != null ? map.values().iterator() : - Collections.EMPTY_LIST.iterator(); + services = it; step(); } @@ -1894,35 +1906,24 @@ class RegistrarImpl implements Registrar } /** Iterate over all matching Items by attribute value. */ - private class AttrItemIter extends ItemIter { + private static class AttrItemIter extends ItemIter { /** SvcRegs obtained from serviceByAttr for chosen attr */ - protected List<SvcReg> svcs; + private final List<SvcReg> svcs; /** Current index into svcs */ - protected int svcidx; + private int svcidx; /** * tmpl.serviceID == null and * tmpl.serviceTypes is empty and * tmpl.attributeSetTemplates[setidx].fields[fldidx] != null */ - public AttrItemIter(Template tmpl, int setidx, int fldidx) { - super(tmpl); - EntryRep set = tmpl.attributeSetTemplates[setidx]; - HashMap<Object,List<SvcReg>>[] attrMaps = - serviceByAttr.get(getDefiningClass(set.eclass, - fldidx)); - if (attrMaps != null && attrMaps[fldidx] != null) { - svcs = attrMaps[fldidx].get(set.fields[fldidx]); - if (svcs != null) { - svcidx = svcs.size(); - step(); - } - } - } - - /** Simple constructor */ - protected AttrItemIter(Template tmpl) { + public AttrItemIter(Template tmpl, List<SvcReg> svcs) { super(tmpl); + this.svcs = svcs; + if (svcs != null) { + svcidx = svcs.size(); + step(); + } } /** Set reg to the next matching element, or null if none. */ @@ -1938,24 +1939,6 @@ class RegistrarImpl implements Registrar } } - /** Iterate over all matching Items by no-fields entry class. */ - private class EmptyAttrItemIter extends AttrItemIter { - - /** - * tmpl.serviceID == null and - * tmpl.serviceTypes is empty and - * eclass has no fields - */ - public EmptyAttrItemIter(Template tmpl, EntryClass eclass) { - super(tmpl); - svcs = serviceByEmptyAttr.get(eclass); - if (svcs != null) { - svcidx = svcs.size(); - step(); - } - } - } - /** Iterate over all matching Items by entry class, dups possible. */ private class ClassItemIter extends ItemIter { /** Entry class to match on */ @@ -2040,15 +2023,16 @@ class RegistrarImpl implements Registrar } /** Iterate over a singleton matching Item by serviceID. */ - private class IDItemIter extends ItemIter { + private static class IDItemIter extends ItemIter { /** tmpl.serviceID != null */ - public IDItemIter(Template tmpl) { + public IDItemIter(Template tmpl, SvcReg reg) { super(tmpl); - reg = serviceByID.get(tmpl.serviceID); if (reg != null && - (reg.leaseExpiration <= now || !matchItem(tmpl, reg.item))) + (reg.leaseExpiration <= now || !matchItem(tmpl, reg.item))) { reg = null; + } + this.reg = reg; } /** Set reg to null */ @@ -2405,30 +2389,29 @@ class RegistrarImpl implements Registrar } /** Service lease expiration thread code */ - private class ServiceExpireThread extends InterruptedStatusThread { - + private static class ServiceExpire implements Runnable { + RegistrarImpl reggie; /** Create a daemon thread */ - public ServiceExpireThread() { - super("service expire"); - setDaemon(true); + public ServiceExpire(RegistrarImpl reggie) { + this.reggie = reggie; } public void run() { try { - concurrentObj.writeLock(); + reggie.concurrentObj.writeLock(); } catch (ConcurrentLockException e) { return; } try { - while (!hasBeenInterrupted()) { + while (!Thread.currentThread().isInterrupted()) { long now = System.currentTimeMillis(); while (true) { - SvcReg reg = serviceByTime.firstKey(); - minSvcExpiration = reg.leaseExpiration; - if (minSvcExpiration > now) + SvcReg reg = reggie.serviceByTime.first(); + reggie.minSvcExpiration = reg.leaseExpiration; + if (reggie.minSvcExpiration > now) break; - deleteService(reg, now); - addLogRecord(new ServiceLeaseCancelledLogObj( + reggie.deleteService(reg, now); + reggie.addLogRecord(new ServiceLeaseCancelledLogObj( reg.item.serviceID, reg.leaseID)); if (logger.isLoggable(Level.FINE)) { logger.log( @@ -2437,46 +2420,44 @@ class RegistrarImpl implements Registrar new Object[]{ reg.item.serviceID }); } } - queueEvents(); try { - concurrentObj.writerWait(serviceNotifier, - minSvcExpiration - now); + reggie.concurrentObj.writerWait(reggie.serviceNotifier, + reggie.minSvcExpiration - now); } catch (ConcurrentLockException e) { return; } } } finally { - concurrentObj.writeUnlock(); + reggie.concurrentObj.writeUnlock(); } } } /** Event lease expiration thread code */ - private class EventExpireThread extends InterruptedStatusThread { - + private static class EventExpire implements Runnable { + private final RegistrarImpl reggie; /** Create a daemon thread */ - public EventExpireThread() { - super("event expire"); - setDaemon(true); + public EventExpire(RegistrarImpl reggie) { + this.reggie = reggie; } public void run() { try { - concurrentObj.writeLock(); + reggie.concurrentObj.writeLock(); } catch (ConcurrentLockException e) { return; } try { - while (!hasBeenInterrupted()) { + while (!Thread.currentThread().isInterrupted()) { long now = System.currentTimeMillis(); - minEventExpiration = Long.MAX_VALUE; - while (!eventByTime.isEmpty()) { - EventReg reg = eventByTime.firstKey(); + reggie.minEventExpiration = Long.MAX_VALUE; + while (!reggie.eventByTime.isEmpty()) { + EventReg reg = reggie.eventByTime.firstKey(); if (reg.getLeaseExpiration() > now) { - minEventExpiration = reg.getLeaseExpiration(); + reggie.minEventExpiration = reg.getLeaseExpiration(); break; } - deleteEvent(reg); + reggie.deleteEvent(reg); if (logger.isLoggable(Level.FINE)) { logger.log( Level.FINE, @@ -2485,14 +2466,14 @@ class RegistrarImpl implements Registrar } } try { - concurrentObj.writerWait(eventNotifier, - minEventExpiration - now); + reggie.concurrentObj.writerWait(reggie.eventNotifier, + reggie.minEventExpiration - now); } catch (ConcurrentLockException e) { return; } } } finally { - concurrentObj.writeUnlock(); + reggie.concurrentObj.writeUnlock(); } } } @@ -2585,24 +2566,25 @@ class RegistrarImpl implements Registrar } /** Multicast discovery request thread code. */ - private class MulticastThread extends InterruptedStatusThread { - + private static class Multicast implements Runnable, Interruptable { + private final RegistrarImpl reggie; /** Multicast group address used by multicast requests */ private final InetAddress requestAddr; /** Multicast socket to receive packets */ private final MulticastSocket socket; /** Interfaces for which configuration failed */ private final List<NetworkInterface> failedInterfaces; + + private volatile boolean interrupted = false; /** * Create a high priority daemon thread. Set up the socket now * rather than in run, so that we get any exception up front. */ - public MulticastThread() throws IOException { - super("multicast request"); - setDaemon(true); + public Multicast(RegistrarImpl reggie) throws IOException { + this.reggie = reggie; List<NetworkInterface> failedInterfaces = new ArrayList<NetworkInterface>(); - if (multicastInterfaces != null && multicastInterfaces.length == 0) + if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0) { requestAddr = null; socket = null; @@ -2611,11 +2593,12 @@ class RegistrarImpl implements Registrar } requestAddr = Constants.getRequestAddress(); socket = new MulticastSocket(Constants.discoveryPort); - if (multicastInterfaces != null) { - Level failureLogLevel = multicastInterfacesSpecified ? + if (reggie.multicastInterfaces != null) { + Level failureLogLevel = reggie.multicastInterfacesSpecified ? Level.WARNING : Levels.HANDLED; - for (int i = 0; i < multicastInterfaces.length; i++) { - NetworkInterface nic = multicastInterfaces[i]; + int l = reggie.multicastInterfaces.length; + for (int i = 0; i < l; i++) { + NetworkInterface nic = reggie.multicastInterfaces[i]; try { socket.setNetworkInterface(nic); socket.joinGroup(requestAddr); @@ -2646,17 +2629,17 @@ class RegistrarImpl implements Registrar } public void run() { - if (multicastInterfaces != null && multicastInterfaces.length == 0) + if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0) { return; } byte[] buf = new byte[ - multicastRequestConstraints.getMulticastMaxPacketSize( + reggie.multicastRequestConstraints.getMulticastMaxPacketSize( DEFAULT_MAX_PACKET_SIZE)]; DatagramPacket dgram = new DatagramPacket(buf, buf.length); long retryTime = - System.currentTimeMillis() + multicastInterfaceRetryInterval; - while (!hasBeenInterrupted()) { + System.currentTimeMillis() + reggie.multicastInterfaceRetryInterval; + while (!interrupted) { try { int timeout = 0; if (!failedInterfaces.isEmpty()) { @@ -2667,7 +2650,7 @@ class RegistrarImpl implements Registrar if (failedInterfaces.isEmpty()) { timeout = 0; } else { - timeout = multicastInterfaceRetryInterval; + timeout = reggie.multicastInterfaceRetryInterval; retryTime = System.currentTimeMillis() + timeout; } @@ -2689,12 +2672,12 @@ class RegistrarImpl implements Registrar } catch (BufferUnderflowException e) { throw new DiscoveryProtocolException(null, e); } - multicastRequestConstraints.checkProtocolVersion(pv); - discoveryResponseExec.execute( + reggie.multicastRequestConstraints.checkProtocolVersion(pv); + reggie.discoveryResponseExec.execute( new DecodeRequestTask( dgram, - getDiscovery(pv), - RegistrarImpl.this + reggie.getDiscovery(pv), + reggie ) ); @@ -2706,7 +2689,7 @@ class RegistrarImpl implements Registrar } catch (InterruptedIOException e) { break; } catch (Exception e) { - if (hasBeenInterrupted()) { + if (interrupted) { break; } logger.log(Levels.HANDLED, @@ -2718,9 +2701,8 @@ class RegistrarImpl implements Registrar public void interrupt() { // close socket to interrupt MulticastSocket.receive operation - if (socket != null) - socket.close(); - super.interrupt(); + interrupted = true; + if (socket != null) socket.close(); } /** @@ -2730,8 +2712,8 @@ class RegistrarImpl implements Registrar * interface. */ private void retryFailedInterfaces() { - for (Iterator i = failedInterfaces.iterator(); i.hasNext(); ) { - NetworkInterface nic = (NetworkInterface) i.next(); + for (Iterator<NetworkInterface> i = failedInterfaces.iterator(); i.hasNext(); ) { + NetworkInterface nic = i.next(); try { if (nic != null) { socket.setNetworkInterface(nic); @@ -2739,7 +2721,7 @@ class RegistrarImpl implements Registrar socket.joinGroup(requestAddr); i.remove(); - Level l = multicastInterfacesSpecified ? + Level l = reggie.multicastInterfacesSpecified ? Level.INFO : Level.FINE; if (logger.isLoggable(l)) { if (nic != null) { @@ -2756,40 +2738,42 @@ class RegistrarImpl implements Registrar } /** Unicast discovery request thread code. */ - private class UnicastThread extends InterruptedStatusThread { + private static class Unicast implements Runnable, Interruptable { + private final RegistrarImpl reggie; /** Server socket to accepts connections on. */ private final ServerSocket listen; /** Listen port */ public final int port; + private volatile boolean interrupted = false; + /** * Create a daemon thread. Set up the socket now rather than in run, * so that we get any exception up front. */ - public UnicastThread(int port) throws IOException { - super("unicast request"); - setDaemon(true); + public Unicast(RegistrarImpl reggie, int port) throws IOException { + this.reggie = reggie; ServerSocket listen = null; if (port == 0) { try { - listen = serverSocketFactory.createServerSocket(Constants.discoveryPort); + listen = reggie.serverSocketFactory.createServerSocket(Constants.discoveryPort); } catch (IOException e) { logger.log( Levels.HANDLED, "failed to bind to default port", e); } } if (listen == null) { - listen = serverSocketFactory.createServerSocket(port); + listen = reggie.serverSocketFactory.createServerSocket(port); } this.listen = listen; this.port = listen.getLocalPort(); } public void run() { - while (!hasBeenInterrupted()) { + while (!interrupted) { try { Socket socket = listen.accept(); - if (hasBeenInterrupted()) { + if (interrupted) { try { socket.close(); } catch (IOException e) { @@ -2798,7 +2782,7 @@ class RegistrarImpl implements Registrar } break; } - discoveryResponseExec.execute(new SocketTask(socket, RegistrarImpl.this)); + reggie.discoveryResponseExec.execute(new SocketTask(socket, reggie)); } catch (InterruptedIOException e) { break; } catch (Exception e) { @@ -2822,17 +2806,17 @@ class RegistrarImpl implements Registrar */ public void interrupt() { try { - Socket s = socketFactory.createSocket(InetAddress.getLocalHost(), port); + interrupted = true; + Socket s = reggie.socketFactory.createSocket(InetAddress.getLocalHost(), port); s.close(); } catch (IOException e) { } - super.interrupt(); } - } /** Multicast discovery announcement thread code. */ - private class AnnounceThread extends InterruptedStatusThread { + private static class Announce implements Runnable { + private final RegistrarImpl reggie; /** Multicast socket to send packets on */ private final MulticastSocket socket; /** Cached datagram packets */ @@ -2846,14 +2830,15 @@ class RegistrarImpl implements Registrar * Create a daemon thread. Set up the socket now rather than in run, * so that we get any exception up front. */ - public AnnounceThread() throws IOException { - super("discovery announcement"); - setDaemon(true); - if (multicastInterfaces == null || multicastInterfaces.length > 0) + public Announce(RegistrarImpl reggie) throws IOException { + this.reggie = reggie; +// super("discovery announcement"); +// setDaemon(true); + if (reggie.multicastInterfaces == null || reggie.multicastInterfaces.length > 0) { socket = new MulticastSocket(); socket.setTimeToLive( - multicastAnnouncementConstraints.getMulticastTimeToLive( + reggie.multicastAnnouncementConstraints.getMulticastTimeToLive( DEFAULT_MULTICAST_TTL)); } else { socket = null; @@ -2861,17 +2846,17 @@ class RegistrarImpl implements Registrar } public synchronized void run() { - if (multicastInterfaces != null && multicastInterfaces.length == 0) + if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0) { return; } try { - while (!hasBeenInterrupted() && announce(memberGroups)) { - wait(multicastAnnouncementInterval); + while (!Thread.currentThread().isInterrupted() && announce(reggie.memberGroups)) { + wait(reggie.multicastAnnouncementInterval); } } catch (InterruptedException e) { } - if (memberGroups.length > 0) + if (reggie.memberGroups.length > 0) announce(new String[0]);//send NO_GROUPS just before shutdown socket.close(); } @@ -2882,26 +2867,27 @@ class RegistrarImpl implements Registrar * synchronized run method in thread. */ private boolean announce(String[] groups) { - if (dataPackets == null || !lastLocator.equals(myLocator) || + if (dataPackets == null || !lastLocator.equals(reggie.myLocator) || !Arrays.equals(lastGroups, groups)) { List packets = new ArrayList(); Discovery disco; try { - disco = getDiscovery(multicastAnnouncementConstraints + disco = reggie.getDiscovery(reggie.multicastAnnouncementConstraints .chooseProtocolVersion()); } catch (DiscoveryProtocolException e) { throw new AssertionError(e); } + LookupLocator myLocator = reggie.myLocator;// Atomic EncodeIterator ei = disco.encodeMulticastAnnouncement( - new MulticastAnnouncement(announcementSeqNo++, + new MulticastAnnouncement(reggie.announcementSeqNo.getAndIncrement(), myLocator.getHost(), myLocator.getPort(), groups, - myServiceID), - multicastAnnouncementConstraints + reggie.myServiceID), + reggie.multicastAnnouncementConstraints .getMulticastMaxPacketSize(DEFAULT_MAX_PACKET_SIZE), - multicastAnnouncementConstraints + reggie.multicastAnnouncementConstraints .getUnfulfilledConstraints()); while (ei.hasNext()) { try { @@ -2937,11 +2923,12 @@ class RegistrarImpl implements Registrar private void send(DatagramPacket[] packets) throws InterruptedIOException { - if (multicastInterfaces != null) { - Level failureLogLevel = multicastInterfacesSpecified ? + if (reggie.multicastInterfaces != null) { + Level failureLogLevel = reggie.multicastInterfacesSpecified ? Level.WARNING : Levels.HANDLED; - for (int i = 0; i < multicastInterfaces.length; i++) { - send(packets, multicastInterfaces[i], failureLogLevel); + int l = reggie.multicastInterfaces.length; + for (int i = 0; i < l; i++) { + send(packets, reggie.multicastInterfaces[i], failureLogLevel); } } else { send(packets, null, Level.WARNING); @@ -3046,33 +3033,35 @@ class RegistrarImpl implements Registrar * be locked while a reader mutex is locked, allows the snapshot to * be treated as a reader process. */ - private class SnapshotThread extends InterruptedStatusThread { - + private static class Snapshot implements Runnable { + RegistrarImpl reggie; /** Create a daemon thread */ - public SnapshotThread() { - super("snapshot thread"); - setDaemon(true); + public Snapshot(RegistrarImpl reggie) { + this.reggie = reggie; +// super("snapshot thread"); +// setDaemon(true); } public void run() { - if (log == null) { + if (reggie.log == null) { return; } try { - concurrentObj.readLock(); + reggie.concurrentObj.readLock(); } catch (ConcurrentLockException e) { return; } try { - while (!hasBeenInterrupted()) { + while (!Thread.currentThread().isInterrupted()) { try { - concurrentObj.readerWait(snapshotNotifier, + reggie.concurrentObj.readerWait(reggie.snapshotNotifier, Long.MAX_VALUE); try { - log.snapshot(); - logFileSize = 0; + reggie.log.snapshot(); + reggie.logFileSize.set(0); } catch (Exception e) { - if (hasBeenInterrupted()) + // InterruptedException is never thrown in try + if (Thread.currentThread().isInterrupted()) return; logger.log(Level.WARNING, "snapshot failed", e); } @@ -3081,7 +3070,7 @@ class RegistrarImpl implements Registrar } } } finally { - concurrentObj.readUnlock(); + reggie.concurrentObj.readUnlock(); } } } @@ -3242,7 +3231,6 @@ class RegistrarImpl implements Registrar throw new SecurityException("privileged service id"); addAttributesDo(serviceID, leaseID, attrSets); addLogRecord(new AttrsAddedLogObj(serviceID, leaseID, attrSets)); - queueEvents(); } finally { concurrentObj.writeUnlock(); } @@ -3263,7 +3251,6 @@ class RegistrarImpl implements Registrar modifyAttributesDo(serviceID, leaseID, attrSetTmpls, attrSets); addLogRecord(new AttrsModifiedLogObj(serviceID, leaseID, attrSetTmpls, attrSets)); - queueEvents(); } finally { concurrentObj.writeUnlock(); } @@ -3282,7 +3269,6 @@ class RegistrarImpl implements Registrar throw new SecurityException("privileged service id"); setAttributesDo(serviceID, leaseID, attrSets); addLogRecord(new AttrsSetLogObj(serviceID, leaseID, attrSets)); - queueEvents(); } finally { concurrentObj.writeUnlock(); } @@ -3297,7 +3283,6 @@ class RegistrarImpl implements Registrar try { cancelServiceLeaseDo(serviceID, leaseID); addLogRecord(new ServiceLeaseCancelledLogObj(serviceID, leaseID)); - queueEvents(); if (logger.isLoggable(Level.FINE)) { logger.log( Level.FINE, @@ -3384,7 +3369,6 @@ class RegistrarImpl implements Registrar try { Exception[] exceptions = cancelLeasesDo(regIDs, leaseIDs); addLogRecord(new LeasesCancelledLogObj(regIDs, leaseIDs)); - queueEvents(); if (logger.isLoggable(Level.FINE)) { for (int i = 0; i < regIDs.length; i++) { if (exceptions != null && exceptions[i] != null) { @@ -3431,7 +3415,6 @@ class RegistrarImpl implements Registrar joiner.addAttributes(attrSets); lookupAttrs = joiner.getAttributes(); addLogRecord(new LookupAttributesChangedLogObj(lookupAttrs)); - queueEvents(); } catch (UnknownLeaseException e) { throw new AssertionError("Self-registration never expires"); } finally { @@ -3453,7 +3436,6 @@ class RegistrarImpl implements Registrar joiner.modifyAttributes(attrSetTemplates, attrSets, true); lookupAttrs = joiner.getAttributes(); addLogRecord(new LookupAttributesChangedLogObj(lookupAttrs)); - queueEvents(); } catch (UnknownLeaseException e) { throw new AssertionError("Self-registration never expires"); } finally { @@ -3733,28 +3715,32 @@ class RegistrarImpl implements Registrar try { if (port == unicastPort) return; - if ((port == 0 && unicaster.port == Constants.discoveryPort) || - port == unicaster.port) + if ((port == 0 && unicast.port == Constants.discoveryPort) || + port == unicast.port) { unicastPort = port; addLogRecord(new UnicastPortSetLogObj(port)); return; } /* create a UnicastThread that listens on the new port */ - UnicastThread newUnicaster = new UnicastThread(port); + unicast = new Unicast(this, port); + Thread newUnicaster = new InterruptedStatusThread( unicast , "unicast request"); + newUnicaster.setDaemon(true); /* terminate the current UnicastThread listening on the old port */ unicaster.interrupt(); try { unicaster.join(); - } catch (InterruptedException e) { } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Restore the interrupt. + } /* start the UnicastThread listening on the new port */ unicaster = newUnicaster; unicaster.start(); unicastPort = port; myLocator = (proxy instanceof RemoteMethodControl) ? new ConstrainableLookupLocator( - myLocator.getHost(), unicaster.port, null) : - new LookupLocator(myLocator.getHost(), unicaster.port); + myLocator.getHost(), unicast.port, null) : + new LookupLocator(myLocator.getHost(), unicast.port); synchronized (announcer) { announcer.notify(); } @@ -3763,7 +3749,7 @@ class RegistrarImpl implements Registrar logger.log( Level.CONFIG, "changed unicast discovery port to {0}", - new Object[]{ Integer.valueOf(unicaster.port) }); + new Object[]{ Integer.valueOf(unicast.port) }); } } finally { concurrentObj.writeUnlock(); @@ -4251,7 +4237,7 @@ class RegistrarImpl implements Registrar */ private void addService(SvcReg reg) { serviceByID.put(reg.item.serviceID, reg); - serviceByTime.put(reg, reg); + serviceByTime.add(reg); addServiceByTypes(reg.item.serviceType, reg); EntryRep[] entries = reg.item.attributeSets; for (int i = entries.length; --i >= 0; ) { @@ -4520,23 +4506,36 @@ class RegistrarImpl implements Registrar /** Return an appropriate iterator for Items matching the Template. */ private ItemIter matchingItems(Template tmpl) { if (tmpl.serviceID != null) - return new IDItemIter(tmpl); - if (!isEmpty(tmpl.serviceTypes)) - return new SvcIterator(tmpl); + return new IDItemIter(tmpl, serviceByID.get(tmpl.serviceID)); + if (!isEmpty(tmpl.serviceTypes)){ + Map<ServiceID,SvcReg> map = serviceByTypeName.get( + tmpl.serviceTypes[0].getName()); + Iterator<SvcReg> services = map != null ? map.values().iterator() : + Collections.EMPTY_LIST.iterator(); + return new SvcIterator(tmpl, services); + } EntryRep[] sets = tmpl.attributeSetTemplates; if (isEmpty(sets)) - return new AllItemIter(); + return new AllItemIter(serviceByID.values().iterator()); for (int i = sets.length; --i >= 0; ) { Object[] fields = sets[i].fields; if (fields.length == 0) { EntryClass eclass = getEmptyEntryClass(sets[i].eclass); if (eclass != null) - return new EmptyAttrItemIter(tmpl, eclass); + return new AttrItemIter(tmpl, serviceByEmptyAttr.get(eclass)); } else { /* try subclass fields before superclass fields */ for (int j = fields.length; --j >= 0; ) { - if (fields[j] != null) - return new AttrItemIter(tmpl, i, j); + if (fields[j] != null){ + EntryRep set = tmpl.attributeSetTemplates[i]; + Map<Object,List<SvcReg>>[] attrMaps = + serviceByAttr.get(getDefiningClass(set.eclass,j)); + List<SvcReg> svcs = null; + if (attrMaps != null && attrMaps[j] != null) { + svcs = attrMaps[j].get(set.fields[j]); + } + return new AttrItemIter(tmpl, svcs); + } } } } @@ -4988,6 +4987,16 @@ class RegistrarImpl implements Registrar if (constructionException != null) throw constructionException; concurrentObj.writeLock(); try { + if (log != null) { + inRecovery = true; + log.recover(); + inRecovery = false; + } + // log snapshot recovers myServiceID + if (myServiceID == null) { + myServiceID = newServiceID(); + } + computeMaxLeases(); // Make sure we're exporting with correct login context. AccessController.doPrivileged(new PrivilegedExceptionAction<Object>(){ @@ -4996,8 +5005,8 @@ class RegistrarImpl implements Registrar proxy = RegistrarProxy.getInstance(myRef, myServiceID); myLocator = (proxy instanceof RemoteMethodControl) ? new ConstrainableLookupLocator( - unicastDiscoveryHost, unicaster.port, null) : - new LookupLocator(unicastDiscoveryHost, unicaster.port); + unicastDiscoveryHost, unicast.port, null) : + new LookupLocator(unicastDiscoveryHost, unicast.port); /* register myself */ Item item = new Item(new ServiceItem(myServiceID, proxy, @@ -5120,7 +5129,6 @@ class RegistrarImpl implements Registrar addService(reg); generateEvents(null, nitem, now); addLogRecord(new SvcRegisteredLogObj(reg)); - queueEvents(); /* see if the expire thread needs to wake up earlier */ if (reg.leaseExpiration < minSvcExpiration) { minSvcExpiration = reg.leaseExpiration; @@ -5583,7 +5591,7 @@ class RegistrarImpl implements Registrar /* force a re-sort: must remove before changing, then reinsert */ serviceByTime.remove(reg); reg.leaseExpiration = renewExpiration; - serviceByTime.put(reg, reg); + serviceByTime.add(reg); /* see if the expire thread needs to wake up earlier */ if (renewExpiration < minSvcExpiration) { minSvcExpiration = renewExpiration; @@ -5605,7 +5613,7 @@ class RegistrarImpl implements Registrar /* force a re-sort: must remove before changing, then reinsert */ serviceByTime.remove(reg); reg.leaseExpiration = renewExpiration; - serviceByTime.put(reg, reg); + serviceByTime.add(reg); } finally { concurrentObj.writeUnlock(); } @@ -5842,17 +5850,6 @@ class RegistrarImpl implements Registrar eventNotifierExec.execute(new EventTask(reg, sid, item, transition, proxy, this)); } - /** Queue all pending EventTasks for processing by the task manager. */ - private void queueEvents() { -// if (!newNotifies.isEmpty()) { -// Iterator<EventTask> i = newNotifies.iterator(); -// while (i.hasNext()){ -// eventNotifierExec.execute(i.next()); -// } -// newNotifies.clear(); -// } - } - /** Generate a new service ID */ private ServiceID newServiceID() { Uuid uuid = serviceIdGenerator.generate(); @@ -5902,7 +5899,7 @@ class RegistrarImpl implements Registrar stream.writeInt(unicastPort); stream.writeObject(memberGroups); stream.writeObject(lookupGroups); - stream.writeLong(announcementSeqNo); + stream.writeLong(announcementSeqNo.get()); marshalAttributes(lookupAttrs, stream); marshalLocators(lookupLocators, stream); for (Iterator iter = serviceByID.entrySet().iterator(); @@ -5969,7 +5966,7 @@ class RegistrarImpl implements Registrar unicastPort = stream.readInt(); memberGroups = (String[])stream.readObject(); lookupGroups = (String[])stream.readObject(); - announcementSeqNo = stream.readLong() + Integer.MAX_VALUE; + announcementSeqNo.set( stream.readLong() + Integer.MAX_VALUE); lookupAttrs = unmarshalAttributes(stream); lookupLocators = prepareLocators( unmarshalLocators(stream), recoveredLocatorPreparer, true); @@ -6048,9 +6045,9 @@ class RegistrarImpl implements Registrar logger.log(Level.FINER, "wrote log record {0}", new Object[]{ rec }); } - if (++logFileSize >= persistenceSnapshotThreshold) { + if (logFileSize.incrementAndGet() >= persistenceSnapshotThreshold) { int snapshotSize = serviceByID.size() + eventByID.size(); - if (logFileSize >= persistenceSnapshotWeight * snapshotSize) { + if (logFileSize.get() >= persistenceSnapshotWeight * snapshotSize) { concurrentObj.waiterNotify(snapshotNotifier); } } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java?rev=1549810&r1=1549809&r2=1549810&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java Tue Dec 10 11:20:57 2013 @@ -26,14 +26,27 @@ package com.sun.jini.thread; */ public class InterruptedStatusThread extends Thread { + /** + * A Runnable task can implement this to be interrupted if something + * special needs to be done to cause the Runnable to notice it's been + * interrupted. + * + * @since 3.0.0 + */ + public interface Interruptable { + public void interrupt(); + } + /** true if thread has been interrupted */ private boolean interrupted = false; + private final Interruptable task; /** * Constructs a new <code>InterruptedStatusThread</code> object. */ public InterruptedStatusThread() { super(); + task = null; } /** @@ -42,6 +55,8 @@ public class InterruptedStatusThread ext */ public InterruptedStatusThread(Runnable target) { super(target); + if (target instanceof Interruptable) task = (Interruptable) target; + else task = null; } /** @@ -51,6 +66,8 @@ public class InterruptedStatusThread ext */ public InterruptedStatusThread(Runnable target, String name) { super(target, name); + if (target instanceof Interruptable) task = (Interruptable) target; + else task = null; } /** @@ -59,6 +76,7 @@ public class InterruptedStatusThread ext */ public InterruptedStatusThread(String name) { super(name); + task = null; } /** @@ -68,6 +86,8 @@ public class InterruptedStatusThread ext */ public InterruptedStatusThread(ThreadGroup group, Runnable target) { super(group, target); + if (target instanceof Interruptable) task = (Interruptable) target; + else task = null; } /** @@ -80,6 +100,8 @@ public class InterruptedStatusThread ext Runnable target, String name) { super(group, target, name); + if (target instanceof Interruptable) task = (Interruptable) target; + else task = null; } /** @@ -95,6 +117,8 @@ public class InterruptedStatusThread ext String name, long stackSize) { super(group, target, name, stackSize); + if (target instanceof Interruptable) task = (Interruptable) target; + else task = null; } /** @@ -104,11 +128,13 @@ public class InterruptedStatusThread ext */ public InterruptedStatusThread(ThreadGroup group, String name) { super(group, name); + task = null; } // inherit javadoc public synchronized void interrupt() { interrupted = true; + if (task != null) task.interrupt(); super.interrupt(); }
