Author: peter_firmstone Date: Mon May 12 14:24:40 2014 New Revision: 1593970
URL: http://svn.apache.org/r1593970 Log: Fixed some latent concurrency bugs within JERI, that had caused issues with Reggie Event delivery for attribute changes, this also allowed simplification of synchronization within Reggie. Profile testing indicates that this runs very close to socket speed, with no contention. Slight change to Distributed protocol. Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/connection/BasicConnManagerFactory.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialReflectionFactory.java Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/connection/BasicConnManagerFactory.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/connection/BasicConnManagerFactory.java?rev=1593970&r1=1593969&r2=1593970&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/connection/BasicConnManagerFactory.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/connection/BasicConnManagerFactory.java Mon May 12 14:24:40 2014 @@ -43,12 +43,14 @@ public class BasicConnManagerFactory imp * @throws NullPointerException if <code>endpoint</code> is * <code>null</code> */ + @Override public ConnManager create(final ConnectionEndpoint endpoint) { return new ConnManager() { private final ConnectionManager manager = - new ConnectionManager(endpoint); + new ConnectionManager(endpoint); + @Override public OutboundRequestIterator newRequest( OutboundRequestHandle handle) { Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java?rev=1593970&r1=1593969&r2=1593970&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Mux.java Mon May 12 14:24:40 2014 @@ -36,6 +36,7 @@ import java.util.BitSet; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.Queue; import java.util.logging.Level; import java.util.logging.Logger; @@ -88,7 +89,7 @@ abstract class Mux { new GetThreadPoolAction(false)); /** session shutdown tasks to be executed asynchronously */ - private static final LinkedList sessionShutdownQueue = new LinkedList(); + private static final Queue<Runnable> sessionShutdownQueue = new LinkedList<Runnable>(); private static class SessionShutdownTask implements Runnable { private final Session[] sessions; @@ -134,7 +135,7 @@ abstract class Mux { Throwable muxDownCause; final BitSet busySessions = new BitSet(); - final Map sessions = new HashMap(5); + final Map<Integer,Session> sessions = new HashMap<Integer,Session>(128); private int expectedPingCookie = -1; @@ -205,13 +206,14 @@ abstract class Mux { * see uninitialized state. */ public void start() throws IOException { - if (role == CLIENT) { - readState = READ_SERVER_CONNECTION_HEADER; - } else { - assert role == SERVER; - readState = READ_CLIENT_CONNECTION_HEADER; - } - + synchronized (readStateLock){ + if (role == CLIENT) { + readState = READ_SERVER_CONNECTION_HEADER; + } else { + assert role == SERVER; + readState = READ_CLIENT_CONNECTION_HEADER; + } + } try { connectionIO.start(); } catch (IOException e) { @@ -220,8 +222,8 @@ abstract class Mux { } if (role == CLIENT) { - asyncSendClientConnectionHeader(); synchronized (muxLock) { + asyncSendClientConnectionHeader(); long now = System.currentTimeMillis(); long endTime = now + this.startTimeout; while (!muxDown && !clientConnectionReady) { @@ -287,39 +289,37 @@ abstract class Mux { * This method MAY be invoked while synchronized on muxLock. */ final void setDown(final String message, final Throwable cause) { + boolean needWorker = false; synchronized (muxLock) { - if (muxDown) { - return; - } + if (muxDown) return; muxDown = true; muxDownMessage = message; muxDownCause = cause; muxLock.notifyAll(); - } - - /* - * The following should be safe because we just left the - * synchonized block, and after setting the muxDown latch - * therein, no other thread should ever touch the "sessions" - * data structure. - * - * Sessions are shut down asynchronously in a separate thread - * to avoid deadlock, in case our caller holds muxLock, - * because individual session locks must never be acquired - * while holding muxLock. - */ - boolean needWorker = false; - synchronized (sessionShutdownQueue) { - if (!sessions.values().isEmpty()) { - sessionShutdownQueue.add(new SessionShutdownTask( - (Session[]) sessions.values().toArray( - new Session[sessions.values().size()]), - message, cause)); - needWorker = true; - } else { - needWorker = !sessionShutdownQueue.isEmpty(); - } - } + /* + * The following should be safe because we just left the + * synchonized block, and after setting the muxDown latch + * therein, no other thread should ever touch the "sessions" + * data structure. + * + * Sessions are shut down asynchronously in a separate thread + * to avoid deadlock, in case our caller holds muxLock, + * because individual session locks must never be acquired + * while holding muxLock. + */ + + synchronized (sessionShutdownQueue) { + if (!sessions.isEmpty()) { + sessionShutdownQueue.add(new SessionShutdownTask( + (Session[]) sessions.values().toArray( + new Session[sessions.values().size()]), + message, cause)); + needWorker = true; + } else { + needWorker = !sessionShutdownQueue.isEmpty(); + } + } + } if (needWorker) { try { systemThreadPool.execute(new Runnable() { @@ -327,11 +327,8 @@ abstract class Mux { while (true) { Runnable task; synchronized (sessionShutdownQueue) { - if (sessionShutdownQueue.isEmpty()) { - break; - } - task = (Runnable) - sessionShutdownQueue.removeFirst(); + if (sessionShutdownQueue.isEmpty()) break; + task = sessionShutdownQueue.remove(); } task.run(); } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java?rev=1593970&r1=1593969&r2=1593970&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Mon May 12 14:24:40 2014 @@ -80,6 +80,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.SortedMap; @@ -220,7 +222,7 @@ class RegistrarImpl implements Registrar * Map from ServiceID to SvcReg. Every service is in this map under * its serviceID. */ - private final Map<ServiceID,SvcReg> serviceByID = new HashMap<ServiceID,SvcReg>(); + private final Map<ServiceID,SvcReg> serviceByID = new HashMap<ServiceID,SvcReg>(200); /** * Identity map from SvcReg to SvcReg, ordered by lease expiration. * Every service is in this map. @@ -231,7 +233,7 @@ class RegistrarImpl implements Registrar * is in this map under its types. */ private final Map<String,Map<ServiceID,SvcReg>> serviceByTypeName - = new HashMap<String,Map<ServiceID,SvcReg>>(); + = new HashMap<String,Map<ServiceID,SvcReg>>(200); /** * Map from EntryClass to HashMap[] where each HashMap is a map from * Object (field value) to ArrayList(SvcReg). The HashMap array has as @@ -259,23 +261,23 @@ class RegistrarImpl implements Registrar * Map from Long(eventID) to EventReg. Every event registration is in * this map under its eventID. */ - private final Map<Long,EventReg> eventByID = new HashMap<Long,EventReg>(11); + private final Map<Long,EventReg> eventByID = new HashMap<Long,EventReg>(200); /** * Identity map from EventReg to EventReg, ordered by lease expiration. * Every event registration is in this map. */ - private final SortedMap<EventReg,EventReg> eventByTime = new TreeMap<EventReg,EventReg>(); + private final Queue<EventReg> eventByTime = new PriorityQueue<EventReg>(); /** * Map from ServiceID to EventReg or EventReg[]. An event * registration is in this map if its template matches on (at least) * a specific serviceID. */ - private final Map<ServiceID,Object> subEventByService = new HashMap<ServiceID,Object>(11); + private final Map<ServiceID,Object> subEventByService = new HashMap<ServiceID,Object>(200); /** * Map from Long(eventID) to EventReg. An event registration is in * this map if its template matches on ANY_SERVICE_ID. */ - private final Map<Long,EventReg> subEventByID = new HashMap<Long,EventReg>(11); + private final Map<Long,EventReg> subEventByID = new HashMap<Long,EventReg>(200); /** Generator for resource (e.g., registration, lease) Uuids */ private final UuidGenerator resourceIdGenerator; @@ -527,7 +529,7 @@ class RegistrarImpl implements Registrar unicastDiscoveryConstraints = init.unicastDiscoveryConstraints; context = init.context; eventNotifierExec = new SynchronousExecutors(init.scheduledExecutor); - eventTaskMap = new TreeMap<EventReg,ExecutorService>(); + eventTaskMap = new HashMap<EventReg,ExecutorService>(200); discoveryResponseExec = init.executor; ReliableLog log = null; Thread serviceExpirer = null; @@ -729,19 +731,35 @@ class RegistrarImpl implements Registrar this.leaseExpiration = leaseExpiration; } - synchronized long incrementSeqNo(long increment){ - seqNo += increment; - return seqNo; - } - - synchronized long incrementAndGetSeqNo(){ + long incrementAndGetSeqNo(){ return ++seqNo; } - synchronized long getSeqNo(){ + long getSeqNo(){ return seqNo; } + @Override + public int hashCode() { + int hash = 7; + hash = 97 * hash + (int) (this.eventID ^ (this.eventID >>> 32)); + hash = 97 * hash + (this.leaseID != null ? this.leaseID.hashCode() : 0); + hash = 97 * hash + this.transitions; + hash = 97 * hash + (this.handback != null ? this.handback.hashCode() : 0); + return hash; + } + + @Override + public boolean equals(Object o){ + if (this == o) return true; + if (!(o instanceof EventReg)) return false; + EventReg that = (EventReg) o; + if (this.eventID != that.eventID) return false; + if (this.transitions != that.transitions) return false; + if (!this.leaseID.equals(that.leaseID)) return false; // leaseID never null + return this.handback.equals(that.handback); + } + /** * Primary sort by leaseExpiration, secondary by eventID. The * secondary sort is immaterial, except to ensure a total order @@ -749,8 +767,7 @@ class RegistrarImpl implements Registrar */ public int compareTo(Object obj) { EventReg reg = (EventReg)obj; - if (this == reg) - return 0; + if (equals(obj)) return 0; if (getLeaseExpiration() < reg.getLeaseExpiration() || (getLeaseExpiration() == reg.getLeaseExpiration() && eventID < reg.eventID)) @@ -762,7 +779,7 @@ class RegistrarImpl implements Registrar * Prepares listener (if non-null) using the given proxy preparer. If * preparation fails, the listener field is set to null. */ - public void prepareListener(ProxyPreparer preparer) { + void prepareListener(ProxyPreparer preparer) { if (listener != null) { try { listener = @@ -779,6 +796,7 @@ class RegistrarImpl implements Registrar } listener = null; } + seqNo += Integer.MAX_VALUE; } } @@ -1102,12 +1120,9 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { - synchronized (eventReg){ // Atomic - eventReg.prepareListener(regImpl.recoveredListenerPreparer); - eventReg.incrementSeqNo(Integer.MAX_VALUE); - } regImpl.concurrentObj.writeLock(); try{ + eventReg.prepareListener(regImpl.recoveredListenerPreparer); regImpl.addEvent(eventReg); regImpl.eventID++; } finally { @@ -2033,12 +2048,15 @@ class RegistrarImpl implements Registrar private final Registrar registrar; /* the time of the event */ private final long now; + /* The listener */ + private final RemoteEventListener listener; /** Simple constructor, except increments reg.seqNo. */ public EventTask(EventReg reg, ServiceID sid, Item item, int transition, RegistrarProxy proxy, Registrar registrar, long now) { this.reg = reg; - seqNo = reg.incrementAndGetSeqNo(); + this.listener = reg.listener; + seqNo = reg.incrementAndGetSeqNo(); this.sid = sid; this.item = item; this.transition = transition; @@ -2053,10 +2071,10 @@ class RegistrarImpl implements Registrar logger.log( Level.FINE, "notifying listener {0} of event {1}", - new Object[]{ reg.listener, Long.valueOf(reg.eventID) }); + new Object[]{ listener, Long.valueOf(reg.eventID) }); } try { - reg.listener.notify(new RegistrarEvent(proxy, reg.eventID, + listener.notify(new RegistrarEvent(proxy, reg.eventID, seqNo, reg.handback, sid, transition, item)); return Boolean.TRUE; @@ -2092,18 +2110,8 @@ class RegistrarImpl implements Registrar } } - /** Keep events going to the same listener ordered. - * Returns a positive integer if depends, 0 if - * no dependency exists and a negative integer if - * precedes. - */ - public boolean dependsOn(EventTask obj) { - if (obj == null) return false; - if (reg.listener.equals(obj.reg.listener)) return true; - return false; - } - /** + * Keep events going to the same listener ordered. * This is inconsistent with Object.equals, it is simply intended to * order tasks by priority. * @param o @@ -2437,8 +2445,8 @@ class RegistrarImpl implements Registrar while (!Thread.currentThread().isInterrupted()) { long now = System.currentTimeMillis(); reggie.minEventExpiration = Long.MAX_VALUE; - while (!reggie.eventByTime.isEmpty()) { - EventReg reg = reggie.eventByTime.firstKey(); + for (EventReg reg = reggie.eventByTime.poll(); + reg != null; reg = reggie.eventByTime.poll()) { if (reg.getLeaseExpiration() > now) { reggie.minEventExpiration = reg.getLeaseExpiration(); break; @@ -4298,7 +4306,7 @@ class RegistrarImpl implements Registrar } Long id = Long.valueOf(reg.eventID); eventByID.put(id, reg); - eventByTime.put(reg, reg); + eventByTime.offer(reg); eventTaskMap.put(reg, eventNotifierExec.newSerialExecutor(new PriorityBlockingQueue()) ); if (reg.tmpl.serviceID != null) { Object val = subEventByService.get(reg.tmpl.serviceID); @@ -5701,7 +5709,7 @@ class RegistrarImpl implements Registrar /* force a re-sort: must remove before changing, then reinsert */ eventByTime.remove(reg); reg.setLeaseExpiration(renewExpiration); - eventByTime.put(reg, reg); + eventByTime.offer(reg); /* see if the expire thread needs to wake up earlier */ if (renewExpiration < minEventExpiration) { minEventExpiration = renewExpiration; @@ -5717,13 +5725,13 @@ class RegistrarImpl implements Registrar { concurrentObj.writeLock(); try { - EventReg reg = (EventReg)eventByID.get(Long.valueOf(eventID)); + EventReg reg = eventByID.get(Long.valueOf(eventID)); if (reg == null || !reg.leaseID.equals(leaseID)) return; /* force a re-sort: must remove before changing, then reinsert */ eventByTime.remove(reg); reg.setLeaseExpiration(renewExpiration); - eventByTime.put(reg, reg); + eventByTime.offer(reg); } finally { concurrentObj.writeUnlock(); } @@ -5741,7 +5749,8 @@ class RegistrarImpl implements Registrar { long now = System.currentTimeMillis(); Exception[] exceptions = null; - for (int i = 0; i < regIDs.length; i++) { + int l = regIDs.length; + for (int i = 0; i < l; i++) { Object id = regIDs[i]; try { if (id instanceof ServiceID) @@ -6026,7 +6035,6 @@ class RegistrarImpl implements Registrar EventReg eReg; while ((eReg = (EventReg)stream.readObject()) != null) { eReg.prepareListener(recoveredListenerPreparer); - eReg.incrementSeqNo(Integer.MAX_VALUE); addEvent(eReg); } } Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java?rev=1593970&r1=1593969&r2=1593970&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/jeri/connection/ConnectionManager.java Mon May 12 14:24:40 2014 @@ -295,7 +295,7 @@ public final class ConnectionManager { /** * Subclass wrapper around MuxClient for outbound connections. */ - private final class OutboundMux extends MuxClient { + private static final class OutboundMux extends MuxClient { /** * The outbound connection. */ @@ -358,7 +358,7 @@ public final class ConnectionManager { */ @Override public OutboundRequest newRequest() throws IOException { - assert !Thread.holdsLock(ConnectionManager.this); +// assert !Thread.holdsLock(ConnectionManager.this); boolean interrupted = false; try { boolean start = false; @@ -369,8 +369,8 @@ public final class ConnectionManager { startLock.wait(); } catch (InterruptedException ex) { interrupted = true; - } - } + } + } if (notStarted){ starting = true; start = true; @@ -378,25 +378,27 @@ public final class ConnectionManager { } if (start){ try { - start(); - } finally { + synchronized (this){ + start(); + } + } finally { synchronized (startLock){ notStarted = false; starting = false; startLock.notifyAll(); - } - } - } + } + } + } } synchronized (this) { - idleTime = 0; - return super.newRequest(); - } + idleTime = 0; + } + return super.newRequest(); } finally { synchronized (this) { assert pendingNewRequests > 0; pendingNewRequests--; - } + } if (interrupted) Thread.currentThread().interrupt(); } } @@ -547,7 +549,7 @@ public final class ConnectionManager { * Calls readResponseData on the connection, exactly once. * Sets the handle to null to indicate that it has been called. */ - private synchronized void readFirst() throws IOException { + private void readFirst() throws IOException { if (handle != null) { try { IOException e = c.readResponseData(handle, in); @@ -645,7 +647,11 @@ public final class ConnectionManager { /** * Outbound request iterator returned by newRequest. */ - private final class ReqIterator implements OutboundRequestIterator { + private static final class ReqIterator implements OutboundRequestIterator { + /** + * ConnectionManager + */ + private final ConnectionManager manager; /** * The request handle. */ @@ -659,8 +665,9 @@ public final class ConnectionManager { */ private OutboundMux mux; - ReqIterator(OutboundRequestHandle handle) { + ReqIterator(OutboundRequestHandle handle, ConnectionManager cm) { this.handle = handle; + manager = cm; } /** @@ -686,7 +693,7 @@ public final class ConnectionManager { throw new NoSuchElementException(); } first = false; - mux = connect(handle); + mux = manager.connect(handle); OutboundRequest req = mux.newRequest(); OutboundRequest sreq = null; try { @@ -695,7 +702,7 @@ public final class ConnectionManager { sreq = new Outbound(req, c, handle); } finally { if (sreq == null) { - remove(mux); + manager.remove(mux); } } return sreq; @@ -759,6 +766,6 @@ public final class ConnectionManager { * <code>null</code> **/ public OutboundRequestIterator newRequest(OutboundRequestHandle handle) { - return new ReqIterator(handle); + return new ReqIterator(handle, this); } } Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialReflectionFactory.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialReflectionFactory.java?rev=1593970&r1=1593969&r2=1593970&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialReflectionFactory.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/io/SerialReflectionFactory.java Mon May 12 14:24:40 2014 @@ -219,9 +219,7 @@ public final class SerialReflectionFacto * object to do so if needs to. */ out.writeObject(parameterTypes); - int l = parameters != null ? parameters.length : 0; - // Write length to stream. - out.writeByte(l); + int l = parameterTypes != null ? parameterTypes.length : 0; for (int i = 0; i < l; i++){ writeObject(parameters[i], out); } @@ -339,7 +337,7 @@ public final class SerialReflectionFacto classOrObject = in.readObject(); method = (String) in.readObject(); parameterTypes = (Class[]) in.readObject(); - byte len = in.readByte(); + int len = parameterTypes != null ? parameterTypes.length : 0; parameters = len == 0 ? null : new Object[len]; for (int i = 0; i < len; i++){ parameters[i] = readObject(in);
