Author: aadamchik
Date: Mon Jan 23 21:03:48 2012
New Revision: 1234984

URL: http://svn.apache.org/viewvc?rev=1234984&view=rev
Log:
CAY-1653 Improving DefaultEventManager concurrency - non-bocking DispatchQueue

The important lock on "dispatchEvent" method is removed

Modified:
    cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt
    
cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java

Modified: cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt
URL: 
http://svn.apache.org/viewvc/cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt?rev=1234984&r1=1234983&r2=1234984&view=diff
==============================================================================
--- cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt (original)
+++ cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt Mon Jan 23 
21:03:48 2012
@@ -28,6 +28,7 @@ CAY-1642 [PATCH] add ability to customiz
 CAY-1643 Updated Velocity templates to use static keys in setters/getters
 CAY-1649 Use BOOLEAN data type on Derby. Require Derby version >= 10.7.1.1.
 CAY-1651 ObjectContext.localObject - defer FaultFailureExceptions on temporary 
IDs
+CAY-1653 Improving DefaultEventManager concurrency - non-bocking DispatchQueue
 
 Bug Fixes Since 3.1M3:
 

Modified: 
cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java
URL: 
http://svn.apache.org/viewvc/cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java?rev=1234984&r1=1234983&r2=1234984&view=diff
==============================================================================
--- 
cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java
 (original)
+++ 
cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java
 Mon Jan 23 21:03:48 2012
@@ -17,16 +17,14 @@
  *  under the License.
  ****************************************************************/
 
-
 package org.apache.cayenne.event;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.cayenne.event.DefaultEventManager.Dispatch;
 import org.apache.cayenne.util.Invocation;
@@ -41,24 +39,35 @@ import org.apache.cayenne.util.Invocatio
  */
 class DispatchQueue {
 
-    private Set<Invocation> subjectInvocations = new HashSet<Invocation>();
-    private Map<Object, Collection<Invocation>> invocationsBySender = new 
WeakHashMap<Object, Collection<Invocation>>();
+    private final ConcurrentMap<Invocation, Object> subjectInvocations;
+    private final Map<Object, ConcurrentMap<Invocation, Object>> 
invocationsBySender;
+
+    DispatchQueue() {
+        subjectInvocations = new ConcurrentHashMap<Invocation, Object>();
+
+        // TODO: need something like com.google.common.collect.MapMaker to 
avoid
+        // synchronization on invocationsBySender
+        invocationsBySender = new WeakHashMap<Object, 
ConcurrentMap<Invocation, Object>>();
+    }
 
     /**
      * Dispatches event to all listeners in the queue that are registered for 
this event
      * and sender.
      */
-    synchronized void dispatchEvent(Dispatch dispatch) {
+    void dispatchEvent(Dispatch dispatch) {
         // dispatch to "any sender" listeners
-        dispatchEvent(subjectInvocations, dispatch);
+        dispatchEvent(subjectInvocations.keySet(), dispatch);
 
         // dispatch to the given sender listeners
         Object sender = dispatch.getSender();
-        dispatchEvent(invocationsForSender(sender, false), dispatch);
+        Map<Invocation, Object> senderInvocations = 
invocationsForSender(sender, false);
+        if (senderInvocations != null) {
+            dispatchEvent(senderInvocations.keySet(), dispatch);
+        }
     }
 
-    synchronized void addInvocation(Invocation invocation, Object sender) {
-        Collection<Invocation> invocations;
+    void addInvocation(Invocation invocation, Object sender) {
+        ConcurrentMap<Invocation, Object> invocations;
 
         if (sender == null) {
             invocations = subjectInvocations;
@@ -71,7 +80,7 @@ class DispatchQueue {
         // result in a memory leak per CAY-770. This seemed to happen when 
lots of
         // invocations got registered, but no events were dispatched (hence 
the stale
         // invocation removal during dispatch did not happen)
-        Iterator<Invocation> it = invocations.iterator();
+        Iterator<Invocation> it = invocations.keySet().iterator();
         while (it.hasNext()) {
             Invocation i = it.next();
             if (i.getTarget() == null) {
@@ -79,10 +88,10 @@ class DispatchQueue {
             }
         }
 
-        invocations.add(invocation);
+        invocations.putIfAbsent(invocation, Boolean.TRUE);
     }
 
-    synchronized boolean removeInvocations(Object listener, Object sender) {
+    boolean removeInvocations(Object listener, Object sender) {
 
         // remove only for specific sender
         if (sender != null) {
@@ -92,32 +101,44 @@ class DispatchQueue {
         // remove listener from all collections
         boolean didRemove = removeInvocations(subjectInvocations, listener);
 
-        for (Collection<Invocation> senderInvocations : 
invocationsBySender.values()) {
-            didRemove = removeInvocations(senderInvocations, listener) || 
didRemove;
+        synchronized (invocationsBySender) {
+            for (ConcurrentMap<Invocation, Object> senderInvocations : 
invocationsBySender
+                    .values()) {
+                didRemove = removeInvocations(senderInvocations, listener) || 
didRemove;
+            }
         }
 
         return didRemove;
     }
 
-    private Collection<Invocation> invocationsForSender(Object sender, boolean 
create) {
-        Collection<Invocation> senderInvocations = 
invocationsBySender.get(sender);
-        if (create && senderInvocations == null) {
-            senderInvocations = new HashSet<Invocation>();
-            invocationsBySender.put(sender, senderInvocations);
-        }
+    private ConcurrentMap<Invocation, Object> invocationsForSender(
+            Object sender,
+            boolean create) {
+
+        synchronized (invocationsBySender) {
+
+            ConcurrentMap<Invocation, Object> senderInvocations = 
invocationsBySender
+                    .get(sender);
+            if (create && senderInvocations == null) {
+                senderInvocations = new ConcurrentHashMap<Invocation, 
Object>();
+                invocationsBySender.put(sender, senderInvocations);
+            }
 
-        return senderInvocations;
+            return senderInvocations;
+        }
     }
 
     // removes all invocations for a given listener
-    private boolean removeInvocations(Collection<Invocation> invocations, 
Object listener) {
+    private boolean removeInvocations(
+            ConcurrentMap<Invocation, Object> invocations,
+            Object listener) {
         if (invocations == null || invocations.isEmpty()) {
             return false;
         }
 
         boolean didRemove = false;
 
-        Iterator<Invocation> invocationsIt = invocations.iterator();
+        Iterator<Invocation> invocationsIt = invocations.keySet().iterator();
         while (invocationsIt.hasNext()) {
             Invocation invocation = invocationsIt.next();
             if (invocation.getTarget() == listener) {
@@ -131,17 +152,15 @@ class DispatchQueue {
 
     // dispatches event to a list of listeners
     private void dispatchEvent(Collection<Invocation> invocations, Dispatch 
dispatch) {
-        if (invocations == null || invocations.isEmpty()) {
-            return;
-        }
 
-        // iterate over copy of the collection as there is a chance a caller 
would want to
-        // (un)register another listener during event processing
-        for (Invocation invocation : new ArrayList<Invocation>(invocations)) {
-            // fire invocation, detect if anything went wrong (e.g. GC'ed 
invocation
-            // targets)
+        Iterator<Invocation> it = invocations.iterator();
+        while (it.hasNext()) {
+
+            // fire invocation, clean up GC'd invocations...
+
+            Invocation invocation = it.next();
             if (!dispatch.fire(invocation)) {
-                invocations.remove(invocation);
+                it.remove();
             }
         }
     }


Reply via email to