Author: aadamchik
Date: Mon Jan 23 21:03:48 2012
New Revision: 1234984
URL: http://svn.apache.org/viewvc?rev=1234984&view=rev
Log:
CAY-1653 Improving DefaultEventManager concurrency - non-bocking DispatchQueue
The important lock on "dispatchEvent" method is removed
Modified:
cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt
cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java
Modified: cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt
URL:
http://svn.apache.org/viewvc/cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt?rev=1234984&r1=1234983&r2=1234984&view=diff
==============================================================================
--- cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt (original)
+++ cayenne/main/trunk/docs/doc/src/main/resources/RELEASE-NOTES.txt Mon Jan 23
21:03:48 2012
@@ -28,6 +28,7 @@ CAY-1642 [PATCH] add ability to customiz
CAY-1643 Updated Velocity templates to use static keys in setters/getters
CAY-1649 Use BOOLEAN data type on Derby. Require Derby version >= 10.7.1.1.
CAY-1651 ObjectContext.localObject - defer FaultFailureExceptions on temporary
IDs
+CAY-1653 Improving DefaultEventManager concurrency - non-bocking DispatchQueue
Bug Fixes Since 3.1M3:
Modified:
cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java
URL:
http://svn.apache.org/viewvc/cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java?rev=1234984&r1=1234983&r2=1234984&view=diff
==============================================================================
---
cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java
(original)
+++
cayenne/main/trunk/framework/cayenne-jdk1.5-unpublished/src/main/java/org/apache/cayenne/event/DispatchQueue.java
Mon Jan 23 21:03:48 2012
@@ -17,16 +17,14 @@
* under the License.
****************************************************************/
-
package org.apache.cayenne.event;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.cayenne.event.DefaultEventManager.Dispatch;
import org.apache.cayenne.util.Invocation;
@@ -41,24 +39,35 @@ import org.apache.cayenne.util.Invocatio
*/
class DispatchQueue {
- private Set<Invocation> subjectInvocations = new HashSet<Invocation>();
- private Map<Object, Collection<Invocation>> invocationsBySender = new
WeakHashMap<Object, Collection<Invocation>>();
+ private final ConcurrentMap<Invocation, Object> subjectInvocations;
+ private final Map<Object, ConcurrentMap<Invocation, Object>>
invocationsBySender;
+
+ DispatchQueue() {
+ subjectInvocations = new ConcurrentHashMap<Invocation, Object>();
+
+ // TODO: need something like com.google.common.collect.MapMaker to
avoid
+ // synchronization on invocationsBySender
+ invocationsBySender = new WeakHashMap<Object,
ConcurrentMap<Invocation, Object>>();
+ }
/**
* Dispatches event to all listeners in the queue that are registered for
this event
* and sender.
*/
- synchronized void dispatchEvent(Dispatch dispatch) {
+ void dispatchEvent(Dispatch dispatch) {
// dispatch to "any sender" listeners
- dispatchEvent(subjectInvocations, dispatch);
+ dispatchEvent(subjectInvocations.keySet(), dispatch);
// dispatch to the given sender listeners
Object sender = dispatch.getSender();
- dispatchEvent(invocationsForSender(sender, false), dispatch);
+ Map<Invocation, Object> senderInvocations =
invocationsForSender(sender, false);
+ if (senderInvocations != null) {
+ dispatchEvent(senderInvocations.keySet(), dispatch);
+ }
}
- synchronized void addInvocation(Invocation invocation, Object sender) {
- Collection<Invocation> invocations;
+ void addInvocation(Invocation invocation, Object sender) {
+ ConcurrentMap<Invocation, Object> invocations;
if (sender == null) {
invocations = subjectInvocations;
@@ -71,7 +80,7 @@ class DispatchQueue {
// result in a memory leak per CAY-770. This seemed to happen when
lots of
// invocations got registered, but no events were dispatched (hence
the stale
// invocation removal during dispatch did not happen)
- Iterator<Invocation> it = invocations.iterator();
+ Iterator<Invocation> it = invocations.keySet().iterator();
while (it.hasNext()) {
Invocation i = it.next();
if (i.getTarget() == null) {
@@ -79,10 +88,10 @@ class DispatchQueue {
}
}
- invocations.add(invocation);
+ invocations.putIfAbsent(invocation, Boolean.TRUE);
}
- synchronized boolean removeInvocations(Object listener, Object sender) {
+ boolean removeInvocations(Object listener, Object sender) {
// remove only for specific sender
if (sender != null) {
@@ -92,32 +101,44 @@ class DispatchQueue {
// remove listener from all collections
boolean didRemove = removeInvocations(subjectInvocations, listener);
- for (Collection<Invocation> senderInvocations :
invocationsBySender.values()) {
- didRemove = removeInvocations(senderInvocations, listener) ||
didRemove;
+ synchronized (invocationsBySender) {
+ for (ConcurrentMap<Invocation, Object> senderInvocations :
invocationsBySender
+ .values()) {
+ didRemove = removeInvocations(senderInvocations, listener) ||
didRemove;
+ }
}
return didRemove;
}
- private Collection<Invocation> invocationsForSender(Object sender, boolean
create) {
- Collection<Invocation> senderInvocations =
invocationsBySender.get(sender);
- if (create && senderInvocations == null) {
- senderInvocations = new HashSet<Invocation>();
- invocationsBySender.put(sender, senderInvocations);
- }
+ private ConcurrentMap<Invocation, Object> invocationsForSender(
+ Object sender,
+ boolean create) {
+
+ synchronized (invocationsBySender) {
+
+ ConcurrentMap<Invocation, Object> senderInvocations =
invocationsBySender
+ .get(sender);
+ if (create && senderInvocations == null) {
+ senderInvocations = new ConcurrentHashMap<Invocation,
Object>();
+ invocationsBySender.put(sender, senderInvocations);
+ }
- return senderInvocations;
+ return senderInvocations;
+ }
}
// removes all invocations for a given listener
- private boolean removeInvocations(Collection<Invocation> invocations,
Object listener) {
+ private boolean removeInvocations(
+ ConcurrentMap<Invocation, Object> invocations,
+ Object listener) {
if (invocations == null || invocations.isEmpty()) {
return false;
}
boolean didRemove = false;
- Iterator<Invocation> invocationsIt = invocations.iterator();
+ Iterator<Invocation> invocationsIt = invocations.keySet().iterator();
while (invocationsIt.hasNext()) {
Invocation invocation = invocationsIt.next();
if (invocation.getTarget() == listener) {
@@ -131,17 +152,15 @@ class DispatchQueue {
// dispatches event to a list of listeners
private void dispatchEvent(Collection<Invocation> invocations, Dispatch
dispatch) {
- if (invocations == null || invocations.isEmpty()) {
- return;
- }
- // iterate over copy of the collection as there is a chance a caller
would want to
- // (un)register another listener during event processing
- for (Invocation invocation : new ArrayList<Invocation>(invocations)) {
- // fire invocation, detect if anything went wrong (e.g. GC'ed
invocation
- // targets)
+ Iterator<Invocation> it = invocations.iterator();
+ while (it.hasNext()) {
+
+ // fire invocation, clean up GC'd invocations...
+
+ Invocation invocation = it.next();
if (!dispatch.fire(invocation)) {
- invocations.remove(invocation);
+ it.remove();
}
}
}