Author: peter_firmstone Date: Thu May 9 15:15:17 2013 New Revision: 1480680
URL: http://svn.apache.org/r1480680 Log: Minor improvements, including fixing some race conditions found in AbstractDgcClient Modified: river/jtsk/skunk/qa_refactor/trunk/build.xml river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/runtime/AbstractDgcClient.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/AggregatePolicyProvider.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/LoaderSplitPolicyProvider.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassProvider.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/CombinerSecurityManager.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/DelegatePermission.java Modified: river/jtsk/skunk/qa_refactor/trunk/build.xml URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/build.xml?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/build.xml (original) +++ river/jtsk/skunk/qa_refactor/trunk/build.xml Thu May 9 15:15:17 2013 @@ -368,8 +368,10 @@ <package name="net.jini.url.file"/> <package name="net.jini.url.httpmd"/> <package name="net.jini.url.https"/> + <package name="org.apache.river.api.common"/> <package name="org.apache.river.api.io"/> <package name="org.apache.river.api.lookup"/> + <package name="org.apache.river.api.net"/> <package name="org.apache.river.api.security"/> <package name="org.apache.river.api.util"/> </javadoc> Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/mux/Session.java Thu May 9 15:15:17 2013 @@ -127,7 +127,7 @@ final class Session { private boolean receivedAckRequired = false; private boolean sentAcknowledgment = false; - private Collection ackListeners = null; + private final Collection<AcknowledgmentSource.Listener> ackListeners = new ArrayList<AcknowledgmentSource.Listener>(3); private boolean sentAckRequired = false; private boolean receivedAcknowledgment = false; @@ -204,9 +204,6 @@ final class Session { } synchronized (sessionLock) { if (outState < FINISHED) { - if (ackListeners == null) { - ackListeners = new ArrayList(3); - } ackListeners.add(listener); return true; } else { @@ -553,7 +550,7 @@ final class Session { } private void notifyAcknowledgmentListeners(final boolean received) { - if (ackListeners != null) { + if (!ackListeners.isEmpty()) { systemThreadPool.execute(new Runnable() { public void run() { Iterator iter = ackListeners.iterator(); @@ -752,7 +749,7 @@ final class Session { boolean eof = closeIfComplete && complete; boolean close = role == SERVER && eof && inState > OPEN; boolean ackRequired = role == SERVER && eof && - (ackListeners != null && !ackListeners.isEmpty()); + (!ackListeners.isEmpty()); int op = Mux.Data | (open ? Mux.Data_open : 0) | Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/runtime/AbstractDgcClient.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/runtime/AbstractDgcClient.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/runtime/AbstractDgcClient.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/jeri/internal/runtime/AbstractDgcClient.java Thu May 9 15:15:17 2013 @@ -29,11 +29,14 @@ import java.rmi.RemoteException; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * AbstractDgcClient implements the client-side behavior of RMI's @@ -237,12 +240,12 @@ abstract class AbstractDgcClient { /* mutable instance state (below) is guarded by this object's lock */ /** true if this entry has been removed from the global table */ - private boolean removed = false; + private volatile boolean removed = false; /** table of refs held for endpoint: maps object ID to RefEntry */ private final Map refTable = new HashMap(5); /** set of RefEntry instances from last (failed) dirty call */ - private Set invalidRefs = new HashSet(5); + private final Set invalidRefs = new HashSet(5); /** absolute time to renew current lease to this endpoint */ private long renewTime = Long.MAX_VALUE; @@ -258,8 +261,11 @@ abstract class AbstractDgcClient { /** true if renew/clean thread may be interrupted */ private boolean interruptible = false; - /** set of clean calls that need to be made */ - private final Set pendingCleans = new HashSet(5); + /** set of clean calls that need to be made, changed Set to an + * underlying ConcurrentHashMap because no lock is held while + * iterating and processing pending Clean Requests*/ + private final Set<CleanRequest> pendingCleans + = Collections.newSetFromMap(new ConcurrentHashMap<CleanRequest,Boolean>(5)); private EndpointEntry(final Object endpoint) { this.endpoint = endpoint; @@ -272,6 +278,14 @@ abstract class AbstractDgcClient { private void start(){ renewCleanThread.start(); } + + boolean pendingCleanCalls(){ + return !pendingCleans.isEmpty(); + } + + boolean removed(){ + return removed; + } /** * Registers the live reference instances in the supplied list to @@ -545,7 +559,7 @@ abstract class AbstractDgcClient { long timeUntilRenew = renewTime - System.currentTimeMillis(); timeToWait = Math.max(timeUntilRenew, 1); - if (!pendingCleans.isEmpty()) { + if (pendingCleanCalls()) { timeToWait = Math.min(timeToWait, cleanInterval); } @@ -597,8 +611,8 @@ abstract class AbstractDgcClient { invalidRefs.addAll(refTable.values()); } if (!invalidRefs.isEmpty()) { - refsToDirty = invalidRefs; - invalidRefs = new HashSet(5); + refsToDirty = new HashSet(invalidRefs); + invalidRefs.clear(); } sequenceNum = getNextSequenceNum(); } @@ -608,10 +622,10 @@ abstract class AbstractDgcClient { makeDirtyCall(refsToDirty, sequenceNum); } - if (!pendingCleans.isEmpty()) { + if (pendingCleanCalls()) { makeCleanCalls(); } - } while (!removed || !pendingCleans.isEmpty()); + } while (!removed() || pendingCleanCalls()); } } @@ -701,7 +715,7 @@ abstract class AbstractDgcClient { * A similar (but different) argument can be made for * ConnectIOException. */ - if (++request.connectFailures >= cleanConnectRetries) { + if (request.connectFailures.incrementAndGet() >= cleanConnectRetries) { iter.remove(); } } else { @@ -855,12 +869,12 @@ abstract class AbstractDgcClient { */ private static class CleanRequest { - long sequenceNum; - Object[] objectIDs; - boolean strong; + final long sequenceNum; + final Object[] objectIDs; + final boolean strong; /** how many times this request has failed with ConnectException */ - int connectFailures = 0; + final AtomicInteger connectFailures = new AtomicInteger(); CleanRequest(long sequenceNum, Object[] objectIDs, boolean strong) { this.sequenceNum = sequenceNum; Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java Thu May 9 15:15:17 2013 @@ -1897,7 +1897,7 @@ public class OutriggerServerImpl int found = 0; final Set conflictSet = new java.util.HashSet(); final Map provisionallyRemovedEntrySet = - RC.map(new ConcurrentHashMap(), Ref.WEAK_IDENTITY, Ref.STRONG, 1000L, 1000L); + RC.map(new ConcurrentHashMap(), Ref.WEAK_IDENTITY, Ref.STRONG, 10000L, 10000L); for (Iterator i=classes.iterator(); i.hasNext() && found < handles.length;) @@ -2263,7 +2263,7 @@ public class OutriggerServerImpl // Changed to concurrent map, because unsynchronized iteration occurs. final Map provisionallyRemovedEntrySet = - RC.map(new ConcurrentHashMap(), Ref.WEAK_IDENTITY, Ref.STRONG, 1000L, 1000L); + RC.map(new ConcurrentHashMap(), Ref.WEAK_IDENTITY, Ref.STRONG, 10000L, 10000L); /* * First we do the straight search @@ -2677,7 +2677,7 @@ public class OutriggerServerImpl * provisionally removed */ final private Map provisionallyRemovedEntrySet - = RC.map(new ConcurrentHashMap(), Ref.WEAK_IDENTITY, Ref.STRONG, 1000L, 1000L) ; + = RC.map(new ConcurrentHashMap(), Ref.WEAK_IDENTITY, Ref.STRONG, 10000L, 10000L) ; private ContentsQuery(Uuid uuid, EntryRep[] tmpls, Txn txn, long limit) { this.uuid = uuid; Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/AggregatePolicyProvider.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/AggregatePolicyProvider.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/AggregatePolicyProvider.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/AggregatePolicyProvider.java Thu May 9 15:15:17 2013 @@ -92,7 +92,7 @@ public class AggregatePolicyProvider private static final ConcurrentMap<Class<? extends Thread>,Boolean> trustGetCCL = RC.concurrentMap( new ConcurrentHashMap<Referrer<Class<? extends Thread>>,Referrer<Boolean>>(), - Ref.WEAK_IDENTITY, Ref.STRONG, 1000L, 0L); + Ref.WEAK_IDENTITY, Ref.STRONG, 10000L, 10000L); private static final ProtectionDomain myDomain = AccessController.doPrivileged( new PrivilegedAction<ProtectionDomain>() { Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/LoaderSplitPolicyProvider.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/LoaderSplitPolicyProvider.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/LoaderSplitPolicyProvider.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/start/LoaderSplitPolicyProvider.java Thu May 9 15:15:17 2013 @@ -98,7 +98,7 @@ public class LoaderSplitPolicyProvider this.defaultPolicy = defaultPolicy; delegateMap = RC.concurrentMap( new ConcurrentHashMap<Referrer<ClassLoader>,Referrer<Policy>>() - ,Ref.WEAK_IDENTITY , Ref.STRONG, 1000L, 0L); + ,Ref.WEAK_IDENTITY , Ref.STRONG, 10000L, 10000L); ensureDependenciesResolved(); } Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java Thu May 9 15:15:17 2013 @@ -111,10 +111,11 @@ final class ThreadPool implements Execut * however it was never really successful, AccessControlContext * is a much more effective way of controlling privilege. * - * We should change this to ensure that each task is executed in the + * We should consider changing this to ensure that each task is executed in the * AccessControlContext of the calling thread, to avoid privilege escalation. */ private final AtomicInteger threadCount; + private final AtomicInteger waitingThreads; private final int delayFactor; private static final int numberOfCores = Runtime.getRuntime().availableProcessors(); @@ -130,6 +131,7 @@ final class ThreadPool implements Execut this.threadGroup = threadGroup; queue = new SynchronousQueue<Runnable>(); //Non blocking queue. threadCount = new AtomicInteger(); + waitingThreads = new AtomicInteger(); this.delayFactor = delayFactor; } @@ -139,7 +141,16 @@ final class ThreadPool implements Execut Runnable task = new Task(runnable, name); boolean accepted = false; try { - accepted = queue.offer(task, 700 * delayFactor* (threadCount.get()/ numberOfCores), TimeUnit.MICROSECONDS); + // If there are no threads, maxDelay = 0; + // If the system is highly loaded, it takes longer for waiting + // threads to wake up and take the task, so this is designed to + // prevent a heavily loaded system from unnecessarily creating + // more threads, while also allowing threads to ramp up quickly +// long maxDelay = (threadCount.get() < 400 && waitingThreads.get() == 0) +// ? 0 : (waitingThreads.get() + 1 ) * (threadCount.get()/ numberOfCores); +// maxDelay = maxDelay * 700 * delayFactor; +// accepted = queue.offer(task, maxDelay, TimeUnit.MICROSECONDS); + accepted = queue.offer(task, waitingThreads.get() * delayFactor * 700, TimeUnit.MICROSECONDS); } catch (InterruptedException ex) { Logger.getLogger(ThreadPool.class.getName()).log(Level.SEVERE, "Calling thread interrupted", ex); // restore interrupt. @@ -222,11 +233,15 @@ final class ThreadPool implements Execut * thread.setName is not thread safe. */ try { + waitingThreads.getAndIncrement(); + task = null; task = queue.poll(idleTimeout, TimeUnit.MILLISECONDS); + waitingThreads.getAndDecrement(); // thread.setName(NewThreadAction.NAME_PREFIX + task); if (task != null) task.run(); // thread.setName(NewThreadAction.NAME_PREFIX + "Idle"); } catch (InterruptedException e){ + waitingThreads.getAndDecrement(); thread.interrupt(); break; } Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassProvider.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassProvider.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassProvider.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/loader/pref/PreferredClassProvider.java Thu May 9 15:15:17 2013 @@ -305,10 +305,10 @@ public class PreferredClassProvider exte urlCache = RC.concurrentMap(intern, Ref.TIME, Ref.STRONG, 10000L, 10000L); ConcurrentMap<Referrer<String>,Referrer<Uri[]>> intern1 = new ConcurrentHashMap<Referrer<String>,Referrer<Uri[]>>(); - uriCache = RC.concurrentMap(intern1, Ref.TIME, Ref.STRONG, 1000L, 1000L); + uriCache = RC.concurrentMap(intern1, Ref.TIME, Ref.STRONG, 10000L, 10000L); ConcurrentMap<Referrer<LoaderKey>,Referrer<ClassLoader>> internal = new ConcurrentHashMap<Referrer<LoaderKey>,Referrer<ClassLoader>>(); - loaderTable = RC.concurrentMap(internal, Ref.STRONG, Ref.WEAK_IDENTITY, 200L, 200L); + loaderTable = RC.concurrentMap(internal, Ref.STRONG, Ref.WEAK_IDENTITY, 5000L, 5000L); } /** Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/net/RFC3986URLClassLoader.java Thu May 9 15:15:17 2013 @@ -34,6 +34,7 @@ import java.net.MalformedURLException; import java.net.SocketPermission; import java.net.URISyntaxException; import java.net.URL; +import java.net.URLClassLoader; import java.net.URLConnection; import java.net.URLDecoder; import java.net.URLStreamHandlerFactory; @@ -909,7 +910,7 @@ public class RFC3986URLClassLoader exten * URLClassloader}. * @return the created {@code URLClassLoader} instance. */ - public static RFC3986URLClassLoader newInstance(final URL[] urls) { + public static URLClassLoader newInstance(final URL[] urls) { final AccessControlContext context = AccessController.getContext(); RFC3986URLClassLoader sub = AccessController .doPrivileged(new PrivilegedAction<RFC3986URLClassLoader>() { @@ -933,7 +934,7 @@ public class RFC3986URLClassLoader exten * URLClassloader. * @return the created {@code URLClassLoader} instance. */ - public static RFC3986URLClassLoader newInstance(final URL[] urls, + public static URLClassLoader newInstance(final URL[] urls, final ClassLoader parentCl) { final AccessControlContext context = AccessController.getContext(); RFC3986URLClassLoader sub = AccessController Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/CombinerSecurityManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/CombinerSecurityManager.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/CombinerSecurityManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/CombinerSecurityManager.java Thu May 9 15:15:17 2013 @@ -122,11 +122,11 @@ extends SecurityManager implements Cachi Referrer<AccessControlContext>> internal = new NonBlockingHashMap<Referrer<AccessControlContext>, Referrer<AccessControlContext>>(); - contextCache = RC.concurrentMap(internal, Ref.TIME, Ref.STRONG, 60000L, 0L); + contextCache = RC.concurrentMap(internal, Ref.TIME, Ref.STRONG, 60000L, 60000L); ConcurrentMap<Referrer<Object>, Referrer<NavigableSet<Permission>>> refmap = new NonBlockingHashMap<Referrer<Object>, Referrer<NavigableSet<Permission>>>(); - checked = RC.concurrentMap(refmap, Ref.TIME, Ref.STRONG, 20000L, 0L); + checked = RC.concurrentMap(refmap, Ref.TIME, Ref.STRONG, 20000L, 20000L); g = new SecurityPermission("getPolicy"); Permission createAccPerm = new SecurityPermission("createAccessControlContext"); action = new Action(); @@ -251,7 +251,7 @@ extends SecurityManager implements Cachi */ NavigableSet<Referrer<Permission>> internal = new ConcurrentSkipListSet<Referrer<Permission>>(permCompare); - checkedPerms = RC.navigableSet(internal, Ref.TIME, 5000L); + checkedPerms = RC.navigableSet(internal, Ref.TIME, 10000L); inTrustedCodeRecursiveCall.set(Boolean.TRUE); try { NavigableSet<Permission> existed = checked.putIfAbsent(context, checkedPerms); Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/DelegatePermission.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/DelegatePermission.java?rev=1480680&r1=1480679&r2=1480680&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/DelegatePermission.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/api/security/DelegatePermission.java Thu May 9 15:15:17 2013 @@ -99,7 +99,7 @@ public final class DelegatePermission ex private static final ConcurrentMap instances = RC.concurrentMap( new ConcurrentSkipListMap( RC.comparator( new PermissionComparator())) - , Ref.WEAK, Ref.WEAK, 1000L, 1000L ); // Value weak too, because it references key. + , Ref.WEAK, Ref.WEAK, 10000L, 10000L ); // Value weak too, because it references key. /** * Factory method to obtain a DelegatePermission, this is essential to
