Author: rajdavies
Date: Thu Aug 17 06:07:24 2006
New Revision: 432224
URL: http://svn.apache.org/viewvc?rev=432224&view=rev
Log:
removed areas of contention (deadlocks)
Modified:
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
Modified:
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java?rev=432224&r1=432223&r2=432224&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
(original)
+++
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
Thu Aug 17 06:07:24 2006
@@ -155,7 +155,7 @@
/**
* @see java.lang.Runnable#run()
*/
- synchronized public void run() {
+ public void run() {
log.debug("Running");
while (true) {
log.debug("run loop start");
Modified:
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java?rev=432224&r1=432223&r2=432224&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
(original)
+++
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
Thu Aug 17 06:07:24 2006
@@ -35,6 +35,8 @@
import org.apache.activemq.command.MessageDispatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* @version $Revision$ $Date$
@@ -46,9 +48,9 @@
private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
private final int maxSessions;
- private ArrayList idleSessions = new ArrayList();
- private LinkedList activeSessions = new LinkedList();
- private boolean closing = false;
+ private List idleSessions = new CopyOnWriteArrayList();
+ private List activeSessions = new CopyOnWriteArrayList();
+ private AtomicBoolean closing = new AtomicBoolean(false);
public ServerSessionPoolImpl(ActiveMQEndpointWorker
activeMQAsfEndpointWorker, int maxSessions) {
this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
@@ -92,15 +94,15 @@
/**
*/
- synchronized public ServerSession getServerSession() throws JMSException {
+ public ServerSession getServerSession() throws JMSException {
log.debug("ServerSession requested.");
- if (closing) {
+ if (closing.get()) {
throw new JMSException("Session Pool Shutting Down.");
}
if (idleSessions.size() > 0) {
ServerSessionImpl ss = (ServerSessionImpl)
idleSessions.remove(idleSessions.size() - 1);
- activeSessions.addLast(ss);
+ activeSessions.add(ss);
log.debug("Using idle session: " + ss);
return ss;
} else {
@@ -121,7 +123,7 @@
return getExistingServerSession();
}
- activeSessions.addLast(ss);
+ activeSessions.add(ss);
log.debug("Created a new session: " + ss);
return ss;
}
@@ -154,20 +156,22 @@
* @return
*/
private ServerSession getExistingServerSession() {
- ServerSessionImpl ss = (ServerSessionImpl)
activeSessions.removeFirst();
- activeSessions.addLast(ss);
+ ServerSessionImpl ss = (ServerSessionImpl) activeSessions.remove(0);
+ activeSessions.add(ss);
log.debug("Reusing an active session: " + ss);
return ss;
}
- synchronized public void returnToPool(ServerSessionImpl ss) {
+ public void returnToPool(ServerSessionImpl ss) {
log.debug("Session returned to pool: " + ss);
activeSessions.remove(ss);
idleSessions.add(ss);
- notify();
+ synchronized(closing){
+ closing.notify();
+ }
}
- synchronized public void removeFromPool(ServerSessionImpl ss) {
+ public void removeFromPool(ServerSessionImpl ss) {
activeSessions.remove(ss);
try {
ActiveMQSession session = (ActiveMQSession) ss.getSession();
@@ -179,16 +183,19 @@
log.error("Error redispatching unconsumed messages from stale
session", t);
}
ss.close();
- notify();
+ synchronized(closing){
+ closing.notify();
+ }
}
public void close() {
- synchronized (this) {
- closing = true;
+ synchronized (closing) {
+ closing.set(true);
closeIdleSessions();
while( activeSessions.size() > 0 ) {
+ System.out.println("ACtive Sessions = " +
activeSessions.size());
try {
- wait();
+ closing.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
@@ -209,14 +216,14 @@
* @return Returns the closing.
*/
public boolean isClosing(){
- return closing;
+ return closing.get();
}
/**
* @param closing The closing to set.
*/
public void setClosing(boolean closing){
- this.closing=closing;
+ this.closing.set(closing);
}
}