Author: peter_firmstone Date: Sat Jul 20 11:37:27 2013 New Revision: 1505125
URL: http://svn.apache.org/r1505125 Log: Backed out changes to maps that were made concurrent in Reggie. Fixed unsynchronized access in Reggie to use ReadersWriter concurrentObj Updated Txn and TxnTable to ensure collections are thread safe in Outrigger. Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnTable.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java?rev=1505125&r1=1505124&r2=1505125&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Txn.java Sat Jul 20 11:37:27 2013 @@ -85,7 +85,7 @@ import net.jini.security.ProxyPreparer; * * @author Sun Microsystems, Inc. */ -class Txn implements TransactableMgr, TransactionConstants, StorableObject<Txn> { +class Txn implements TransactableMgr, TransactionConstants, StorableObject<Txn>, Comparable<Txn> { /** The internal id Outrigger as assigned to the transaction */ final private long id; @@ -514,4 +514,25 @@ class Txn implements TransactableMgr, Tr } return this; } + + @Override + public int compareTo(Txn o) { + if (o == null) return -1; + if (o.id < id) return -1; + if (o.id > id) return 1; + return 0; + } + + @Override + public int hashCode() { + int hash = 7; + hash = 31 * hash + (int) (this.id ^ (this.id >>> 32)); + return hash; + } + + public boolean equals(Object o){ + if (!(o instanceof Txn)) return false; + Txn txn = (Txn) o; + return id == txn.id; + } } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnTable.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnTable.java?rev=1505125&r1=1505124&r2=1505125&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnTable.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnTable.java Sat Jul 20 11:37:27 2013 @@ -17,20 +17,18 @@ */ package com.sun.jini.outrigger; +import com.sun.jini.logging.Levels; import java.io.IOException; -import java.util.Map; -import java.util.List; import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.logging.Logger; - -import net.jini.core.transaction.server.TransactionManager; import net.jini.core.transaction.server.ServerTransaction; +import net.jini.core.transaction.server.TransactionManager; import net.jini.security.ProxyPreparer; -import com.sun.jini.logging.Levels; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - /** * Keeps a mapping from {@link TransactionManager}/id pairs, to {@link Txn} * objects. Some <code>Txn</code>s may be <i>broken</i>, that is the @@ -124,17 +122,14 @@ class TxnTable { * <code>Txn</code> objects that have the id. <code>null</code> if there are * no broken <code>Txn</code>s. */ - private final ConcurrentMap<Long,List<Txn>> brokenTxns = new ConcurrentHashMap<Long,List<Txn>>(); + private final ConcurrentMap<Long,Set<Txn>> brokenTxns = new ConcurrentHashMap<Long,Set<Txn>>(); /** * <code>ProxyPreparer</code> to use when unpacking transactions, may be * <code>null</code>. */ private final ProxyPreparer proxyPreparer; - - /** An array of type <code>Txn</code> to pass to <code>toArray</code> */ - private static final Txn[] txnArray = new Txn[0]; - + /** The logger to use */ static private final Logger logger = Logger .getLogger(OutriggerServerImpl.txnLoggerName); @@ -180,34 +175,20 @@ class TxnTable { Txn get(TransactionManager manager, long id) throws IOException, ClassNotFoundException { final Long idAsLong; - final Txn brokenTxnsForId[]; + Set<Txn> txnsForId; // Try the table of non-broken txns first { - final Txn r = (Txn) txns.get(new Key(manager, id, false)); + final Txn r = txns.get(new Key(manager, id, false)); if (r != null) return r; - - // Check broken txns -// if (brokenTxns.isEmpty()) return null;// No broken Txns so txns is definitive idAsLong = Long.valueOf(id); - final List txnsForId = (List) brokenTxns.get(idAsLong); + txnsForId = brokenTxns.get(idAsLong); if (txnsForId == null) /* * Broken Txns, but none with the right ID so txns is definitive * for this manager/id pair. */ return null; - - /* - * If we are here there are broken Txns with the specified id, we - * need to try and fix each one and check to see if there is a - * match. Make a copy of the list so we don't need to hold a lock - * while making fix attempts (which in general involve remote - * communications). - */ - synchronized (txnsForId){ - brokenTxnsForId = (Txn[]) txnsForId.toArray(txnArray); - } } /* @@ -216,23 +197,24 @@ class TxnTable { */ Txn match = null; Throwable t = null; // The first throwable we get, if any - final List<Txn> fixed = new java.util.LinkedList<Txn>(); // fixed Txns - for (int i = 0; i < brokenTxnsForId.length; i++) { + Iterator<Txn> txnsit = txnsForId.iterator(); + while (txnsit.hasNext()){ try { - final ServerTransaction st = brokenTxnsForId[i] + Txn txn = txnsit.next(); + final ServerTransaction st = txn .getTransaction(proxyPreparer); /* - * We fixed a Txn, remember so we can move it to txns. + * We fixed a Txn, so we can move it to txns. */ - fixed.add(brokenTxnsForId[i]); - + put(txn); + txnsit.remove(); /* * Did this match? Pass unprepared manager to prepared one! */ if (st.mgr.equals(manager)) { // bingo! - match = brokenTxnsForId[i]; + match = txn; break; } } catch (Throwable tt) { @@ -247,49 +229,39 @@ class TxnTable { if (t == null) t = tt; + // We could potentially swallow an Error if we find a match + // Is this advisable? A denial of service + // attack could throw an Error. But then it's not + // necessarily a good thing to swallow an Error either. } } - - /* - * Now that all the remote operations are done we - * can move anything that was fixed. - */ - if (!fixed.isEmpty()) { - - if (!brokenTxns.isEmpty()) { - final List<Txn> txnsForId = brokenTxns.get(idAsLong); - if (txnsForId != null) { - // Remove from brokenTxns - synchronized (txnsForId){ - txnsForId.removeAll(fixed); - // can we get rid of txnsForId and brokenTxns? - if (!txnsForId.isEmpty()){ - // Make sure we only remove txnsForId and not some other collection. - brokenTxns.remove(idAsLong, txnsForId); - } - } - // Put in txns - for (Iterator i = fixed.iterator(); i.hasNext();) { - put((Txn) i.next()); - } - } else { - /* - * if the list for this id is gone the fixed Txns must - * have already been moved by someone else - */ - } + + // Now remove empty collection from brokenTxns so it doesn't leak memory + while (txnsForId.isEmpty()){ + boolean removed = brokenTxns.remove(idAsLong, txnsForId); + if (!removed) break; + try { + Thread.sleep(10L); // Allow time for other threads to complete. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); // Restore interrupt. + } + // Check that set is still empty. + if (removed && !txnsForId.isEmpty()){ + // Somebody had a reference and added to the set. + Set<Txn> existed = brokenTxns.putIfAbsent(idAsLong, txnsForId); + if (existed != null){ + // Someone else replaced our set! + // Add all to this new collection + existed.addAll(txnsForId); + txnsForId = existed; + } } else { - /* - * if brokenTxns is gone the fixed Txns must have already - * been moved by someone else - */ + break; } - - } + } // Did we run out of candidates, or did we find a match? - if (match != null) - return match; + if (match != null) return match; /* * Was there some Txn that may have been a match but is still broken so @@ -398,32 +370,15 @@ class TxnTable { if (st == null) { // txn is broken, put in brokenTxns final Long id = Long.valueOf(txn.getTransactionId()); - List<Txn> txnsForId = brokenTxns.get(id); + Set<Txn> txnsForId = brokenTxns.get(id); if (txnsForId == null) { - txnsForId = new java.util.LinkedList<Txn>(); - txnsForId.add(txn); // Never add an empty collection. - List<Txn> existed = brokenTxns.putIfAbsent(id, txnsForId); - while (existed != null) { - synchronized (existed){ - // Never add to an empty collection, it may have been removed. - // Try to replace it with our shiny new collection. - if (existed.isEmpty()){ - boolean replaced = brokenTxns.replace(id, existed, txnsForId); - if (!replaced){ - //This means someone else has replaced it first. - existed = brokenTxns.putIfAbsent(id, txnsForId); - //Existed will be another instance, very low probability of null. - //If it is another instance we're not sync'd on it until next loop. - } - } else { - existed.add(txn); - existed = null; - } - - } + txnsForId = new ConcurrentSkipListSet<Txn>(); + Set<Txn> existed = brokenTxns.putIfAbsent(id, txnsForId); + if (existed != null) { + txnsForId = existed; } - } - + } + txnsForId.add(txn); } else { // Txn is ok, put it in txns final Key k = new Key(st.mgr, st.id, true); Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java?rev=1505125&r1=1505124&r2=1505125&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Sat Jul 20 11:37:27 2013 @@ -207,7 +207,7 @@ class RegistrarImpl implements Registrar * Map from ServiceID to SvcReg. Every service is in this map under * its serviceID. */ - private final Map<ServiceID,SvcReg> serviceByID = new ConcurrentHashMap<ServiceID,SvcReg>(); + private final Map<ServiceID,SvcReg> serviceByID = new HashMap<ServiceID,SvcReg>(); /** * Identity map from SvcReg to SvcReg, ordered by lease expiration. * Every service is in this map. @@ -246,12 +246,12 @@ class RegistrarImpl implements Registrar * Map from Long(eventID) to EventReg. Every event registration is in * this map under its eventID. */ - private final Map<Long,EventReg> eventByID = new ConcurrentHashMap<Long,EventReg>(11); + private final Map<Long,EventReg> eventByID = new HashMap<Long,EventReg>(11); /** * Identity map from EventReg to EventReg, ordered by lease expiration. * Every event registration is in this map. */ - private final SortedMap<EventReg,EventReg> eventByTime = new ConcurrentSkipListMap<EventReg,EventReg>(); + private final SortedMap<EventReg,EventReg> eventByTime = new TreeMap<EventReg,EventReg>(); /** * Map from ServiceID to EventReg or EventReg[]. An event * registration is in this map if its template matches on (at least) @@ -690,7 +690,7 @@ class RegistrarImpl implements Registrar * * @serial */ - public long leaseExpiration; + private long leaseExpiration; /** Simple constructor */ public EventReg(long eventID, Uuid leaseID, Template tmpl, @@ -705,6 +705,19 @@ class RegistrarImpl implements Registrar this.handback = handback; this.leaseExpiration = leaseExpiration; } + + synchronized long incrementSeqNo(long increment){ + seqNo += increment; + return seqNo; + } + + synchronized long incrementAndGetSeqNo(){ + return ++seqNo; + } + + synchronized long getSeqNo(){ + return seqNo; + } /** * Primary sort by leaseExpiration, secondary by eventID. The @@ -715,8 +728,8 @@ class RegistrarImpl implements Registrar EventReg reg = (EventReg)obj; if (this == reg) return 0; - if (leaseExpiration < reg.leaseExpiration || - (leaseExpiration == reg.leaseExpiration && + if (getLeaseExpiration() < reg.getLeaseExpiration() || + (getLeaseExpiration() == reg.getLeaseExpiration() && eventID < reg.eventID)) return -1; return 1; @@ -777,6 +790,20 @@ class RegistrarImpl implements Registrar "failed to recover event listener", e); } } + + /** + * @return the leaseExpiration + */ + synchronized long getLeaseExpiration() { + return leaseExpiration; + } + + /** + * @param leaseExpiration the leaseExpiration to set + */ + synchronized void setLeaseExpiration(long leaseExpiration) { + this.leaseExpiration = leaseExpiration; + } } /** @@ -822,11 +849,16 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { - SvcReg oldReg = - (SvcReg)regImpl.serviceByID.get(reg.item.serviceID); - if (oldReg != null) - regImpl.deleteService(oldReg, 0); - regImpl.addService(reg); + regImpl.concurrentObj.writeLock(); + try { + SvcReg oldReg = + (SvcReg)regImpl.serviceByID.get(reg.item.serviceID); + if (oldReg != null) + regImpl.deleteService(oldReg, 0); + regImpl.addService(reg); + } finally { + regImpl.concurrentObj.writeUnlock(); + } } } @@ -877,13 +909,16 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { + regImpl.concurrentObj.writeLock(); try { regImpl.addAttributesDo(serviceID, leaseID, attrSets); } catch (UnknownLeaseException e) { /* this exception should never occur when recovering */ throw new AssertionError("an UnknownLeaseException should" + " never occur during recovery"); - } + } finally { + regImpl.concurrentObj.writeUnlock(); + } } } @@ -942,6 +977,7 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { + regImpl.concurrentObj.writeLock(); try { regImpl.modifyAttributesDo(serviceID, leaseID, attrSetTmpls, attrSets); @@ -949,7 +985,9 @@ class RegistrarImpl implements Registrar /* this exception should never occur when recovering */ throw new AssertionError("an UnknownLeaseException should" + " never occur during recovery"); - } + } finally { + regImpl.concurrentObj.writeUnlock(); + } } } @@ -1000,11 +1038,14 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { + regImpl.concurrentObj.writeLock(); try { regImpl.setAttributesDo(serviceID, leaseID, attrSets); } catch (UnknownLeaseException e) { /* this exception should never occur when recovering */ - } + } finally { + regImpl.concurrentObj.writeUnlock(); + } } } @@ -1038,12 +1079,21 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { - eventReg.prepareListener(regImpl.recoveredListenerPreparer); - eventReg.seqNo += Integer.MAX_VALUE; - regImpl.addEvent(eventReg); - regImpl.eventID++; + synchronized (eventReg){ // Atomic + eventReg.prepareListener(regImpl.recoveredListenerPreparer); + eventReg.incrementSeqNo(Integer.MAX_VALUE); + } + regImpl.concurrentObj.writeLock(); + try{ + regImpl.addEvent(eventReg); + regImpl.eventID++; + } finally { + regImpl.concurrentObj.writeUnlock(); + } } } + + /** * LogObj class whose instances are recorded to the log file whenever @@ -1083,11 +1133,14 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { + regImpl.concurrentObj.writeLock(); try { regImpl.cancelServiceLeaseDo(serviceID, leaseID); } catch (UnknownLeaseException e) { /* this exception should never occur when recovering */ - } + } finally { + regImpl.concurrentObj.writeUnlock(); + } } } @@ -1180,11 +1233,14 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { + regImpl.concurrentObj.writeLock(); try { regImpl.cancelEventLeaseDo(eventID, leaseID); } catch (UnknownLeaseException e) { /* this exception should never occur when recovering */ - } + } finally { + regImpl.concurrentObj.writeUnlock(); + } } } @@ -1328,7 +1384,12 @@ class RegistrarImpl implements Registrar /* Exceptions can be returned, since we didn't weed out unknown * leases before logging, but we can just ignore them anyway. */ - regImpl.cancelLeasesDo(regIDs, leaseIDs); + regImpl.concurrentObj.writeLock(); + try { + regImpl.cancelLeasesDo(regIDs, leaseIDs); + } finally { + regImpl.concurrentObj.writeUnlock(); + } } } @@ -1523,8 +1584,8 @@ class RegistrarImpl implements Registrar * @see RegistrarImpl.LocalLogHandler#applyUpdate */ public void apply(RegistrarImpl regImpl) { - regImpl.lookupAttrs = attrs; - } + regImpl.lookupAttrs = attrs; + } /** * Writes attributes as a null-terminated list of MarshalledInstances. @@ -1975,7 +2036,7 @@ class RegistrarImpl implements Registrar int transition) { this.reg = reg; - seqNo = ++reg.seqNo; + seqNo = reg.incrementAndGetSeqNo(); this.sid = sid; this.item = item; this.transition = transition; @@ -2335,8 +2396,8 @@ class RegistrarImpl implements Registrar minEventExpiration = Long.MAX_VALUE; while (!eventByTime.isEmpty()) { EventReg reg = eventByTime.firstKey(); - if (reg.leaseExpiration > now) { - minEventExpiration = reg.leaseExpiration; + if (reg.getLeaseExpiration() > now) { + minEventExpiration = reg.getLeaseExpiration(); break; } deleteEvent(reg); @@ -5095,8 +5156,8 @@ class RegistrarImpl implements Registrar addEvent(reg); addLogRecord(new EventRegisteredLogObj(reg)); /* see if the expire thread needs to wake up earlier */ - if (reg.leaseExpiration < minEventExpiration) { - minEventExpiration = reg.leaseExpiration; + if (reg.getLeaseExpiration() < minEventExpiration) { + minEventExpiration = reg.getLeaseExpiration(); concurrentObj.waiterNotify(eventNotifier); } return new EventRegistration( @@ -5106,9 +5167,8 @@ class RegistrarImpl implements Registrar myRef, myServiceID, reg.eventID, - reg.leaseID, - reg.leaseExpiration), - reg.seqNo); + reg.leaseID, reg.getLeaseExpiration()), + reg.getSeqNo()); } /** @@ -5407,10 +5467,10 @@ class RegistrarImpl implements Registrar private EventReg checkEvent(Uuid leaseID, long eventID, long now) throws UnknownLeaseException { - EventReg reg = (EventReg)eventByID.get(Long.valueOf(eventID)); + EventReg reg = eventByID.get(Long.valueOf(eventID)); if (reg == null) throw new UnknownLeaseException("No event recorded for ID: " + eventID); if (!reg.leaseID.equals(leaseID)) throw new UnknownLeaseException("Incorrect lease ID: " + eventID + " not equal to reg lease ID: " + reg.leaseID); - if (reg.leaseExpiration <= now) throw new UnknownLeaseException("Lease expired"); + if (reg.getLeaseExpiration() <= now) throw new UnknownLeaseException("Lease expired"); return reg; } @@ -5450,13 +5510,18 @@ class RegistrarImpl implements Registrar Uuid leaseID, long renewExpiration) { - SvcReg reg = serviceByID.get(serviceID); - if (reg == null || !reg.leaseID.equals(leaseID)) - return; - /* force a re-sort: must remove before changing, then reinsert */ - serviceByTime.remove(reg); - reg.leaseExpiration = renewExpiration; - serviceByTime.put(reg, reg); + concurrentObj.writeLock(); + try { + SvcReg reg = serviceByID.get(serviceID); + if (reg == null || !reg.leaseID.equals(leaseID)) + return; + /* force a re-sort: must remove before changing, then reinsert */ + serviceByTime.remove(reg); + reg.leaseExpiration = renewExpiration; + serviceByTime.put(reg, reg); + } finally { + concurrentObj.writeUnlock(); + } } /** The code that does the real work of cancelEventLease. */ @@ -5470,7 +5535,7 @@ class RegistrarImpl implements Registrar // throw new UnknownLeaseException(); deleteEvent(reg); /* wake up thread if this might be the (only) earliest time */ - if (reg.leaseExpiration == minEventExpiration) + if (reg.getLeaseExpiration() == minEventExpiration) concurrentObj.waiterNotify(eventNotifier); } @@ -5500,12 +5565,12 @@ class RegistrarImpl implements Registrar throw new IllegalArgumentException("negative lease duration"); EventReg reg = checkEvent(leaseID, eventID, now); if (renewDuration > maxEventLease && - renewDuration > reg.leaseExpiration - now) - renewDuration = Math.max(reg.leaseExpiration - now, maxEventLease); + renewDuration > reg.getLeaseExpiration() - now) + renewDuration = Math.max(reg.getLeaseExpiration() - now, maxEventLease); long renewExpiration = now + renewDuration; /* force a re-sort: must remove before changing, then reinsert */ eventByTime.remove(reg); - reg.leaseExpiration = renewExpiration; + reg.setLeaseExpiration(renewExpiration); eventByTime.put(reg, reg); /* see if the expire thread needs to wake up earlier */ if (renewExpiration < minEventExpiration) { @@ -5520,13 +5585,18 @@ class RegistrarImpl implements Registrar Uuid leaseID, long renewExpiration) { - EventReg reg = (EventReg)eventByID.get(Long.valueOf(eventID)); - if (reg == null || !reg.leaseID.equals(leaseID)) - return; - /* force a re-sort: must remove before changing, then reinsert */ - eventByTime.remove(reg); - reg.leaseExpiration = renewExpiration; - eventByTime.put(reg, reg); + concurrentObj.writeLock(); + try { + EventReg reg = (EventReg)eventByID.get(Long.valueOf(eventID)); + if (reg == null || !reg.leaseID.equals(leaseID)) + return; + /* force a re-sort: must remove before changing, then reinsert */ + eventByTime.remove(reg); + reg.setLeaseExpiration(renewExpiration); + eventByTime.put(reg, reg); + } finally { + concurrentObj.writeUnlock(); + } } /** @@ -5652,7 +5722,7 @@ class RegistrarImpl implements Registrar ServiceID sid, long now) { - if (reg.leaseExpiration <= now) + if (reg.getLeaseExpiration() <= now) return; if ((reg.transitions & ServiceRegistrar.TRANSITION_NOMATCH_MATCH) != 0 && @@ -5836,7 +5906,8 @@ class RegistrarImpl implements Registrar EventReg eReg; while ((eReg = (EventReg)stream.readObject()) != null) { eReg.prepareListener(recoveredListenerPreparer); - eReg.seqNo += Integer.MAX_VALUE; +// eReg.seqNo += Integer.MAX_VALUE; + eReg.incrementSeqNo(Integer.MAX_VALUE); addEvent(eReg); } }
