Author: rajdavies
Date: Thu Aug 17 06:07:24 2006
New Revision: 432224

URL: http://svn.apache.org/viewvc?rev=432224&view=rev
Log:
removed areas of contention (deadlocks)

Modified:
    
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
    
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java

Modified: 
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java?rev=432224&r1=432223&r2=432224&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
 (original)
+++ 
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
 Thu Aug 17 06:07:24 2006
@@ -155,7 +155,7 @@
     /**
      * @see java.lang.Runnable#run()
      */
-    synchronized public void run() {
+    public void run() {
         log.debug("Running"); 
         while (true) {
             log.debug("run loop start");            

Modified: 
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java?rev=432224&r1=432223&r2=432224&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
 (original)
+++ 
incubator/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
 Thu Aug 17 06:07:24 2006
@@ -35,6 +35,8 @@
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @version $Revision$ $Date$
@@ -46,9 +48,9 @@
     private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
     private final int maxSessions;
 
-    private ArrayList idleSessions = new ArrayList();
-    private LinkedList activeSessions = new LinkedList();
-    private boolean closing = false;
+    private List idleSessions = new CopyOnWriteArrayList();
+    private List activeSessions = new CopyOnWriteArrayList();
+    private AtomicBoolean closing = new AtomicBoolean(false);
 
     public ServerSessionPoolImpl(ActiveMQEndpointWorker 
activeMQAsfEndpointWorker, int maxSessions) {
         this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
@@ -92,15 +94,15 @@
 
     /**
      */
-    synchronized public ServerSession getServerSession() throws JMSException {
+    public ServerSession getServerSession() throws JMSException {
         log.debug("ServerSession requested.");
-        if (closing) {
+        if (closing.get()) {
             throw new JMSException("Session Pool Shutting Down.");
         }
 
         if (idleSessions.size() > 0) {
             ServerSessionImpl ss = (ServerSessionImpl) 
idleSessions.remove(idleSessions.size() - 1);
-            activeSessions.addLast(ss);
+            activeSessions.add(ss);
             log.debug("Using idle session: " + ss);
             return ss;
         } else {
@@ -121,7 +123,7 @@
 
                 return getExistingServerSession();
             }
-            activeSessions.addLast(ss);
+            activeSessions.add(ss);
             log.debug("Created a new session: " + ss);
             return ss;
         }
@@ -154,20 +156,22 @@
      * @return
      */
     private ServerSession getExistingServerSession() {
-        ServerSessionImpl ss = (ServerSessionImpl) 
activeSessions.removeFirst();
-        activeSessions.addLast(ss);
+        ServerSessionImpl ss = (ServerSessionImpl) activeSessions.remove(0);
+        activeSessions.add(ss);
         log.debug("Reusing an active session: " + ss);
         return ss;
     }
 
-    synchronized public void returnToPool(ServerSessionImpl ss) {
+    public void returnToPool(ServerSessionImpl ss) {
         log.debug("Session returned to pool: " + ss);
         activeSessions.remove(ss);
         idleSessions.add(ss);
-        notify();
+        synchronized(closing){
+            closing.notify();
+        }
     }
 
-    synchronized public void removeFromPool(ServerSessionImpl ss) {
+     public void removeFromPool(ServerSessionImpl ss) {
         activeSessions.remove(ss);
         try {
             ActiveMQSession session = (ActiveMQSession) ss.getSession();
@@ -179,16 +183,19 @@
             log.error("Error redispatching unconsumed messages from stale 
session", t);        
         }
         ss.close();
-        notify();
+        synchronized(closing){
+            closing.notify();
+        }
     }
 
     public void close() {
-        synchronized (this) {
-            closing = true;
+        synchronized (closing) {
+            closing.set(true);
             closeIdleSessions();
             while( activeSessions.size() > 0 ) {
+                System.out.println("ACtive Sessions = " + 
activeSessions.size());
                 try {
-                    wait();
+                    closing.wait(1000);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     return;
@@ -209,14 +216,14 @@
      * @return Returns the closing.
      */
     public boolean isClosing(){
-        return closing;
+        return closing.get();
     }
 
     /**
      * @param closing The closing to set.
      */
     public void setClosing(boolean closing){
-        this.closing=closing;
+        this.closing.set(closing);
     }
 
 }


Reply via email to