Thanks David!
On 11 Sep 2009, at 20:31, djen...@apache.org wrote:

Author: djencks
Date: Fri Sep 11 19:31:11 2009
New Revision: 813992

URL: http://svn.apache.org/viewvc?rev=813992&view=rev
Log:
AMQ-2166 unregister session proxies when they are closed. Patch (modified) from Mario Siegenthaler, slightly modified

Modified:
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ ManagedConnectionProxy.java activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ ManagedSessionProxy.java

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/ activemq/ra/ManagedConnectionProxy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedConnectionProxy.java?rev=813992&r1=813991&r2=813992&view=diff
= = = = = = = = ====================================================================== --- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ ManagedConnectionProxy.java (original) +++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ ManagedConnectionProxy.java Fri Sep 11 19:31:11 2009
@@ -17,7 +17,7 @@
package org.apache.activemq.ra;

import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.List;

import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
@@ -34,7 +34,6 @@
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
-
import org.apache.activemq.ActiveMQQueueSession;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicSession;
@@ -42,13 +41,13 @@
/**
* Acts as a pass through proxy for a JMS Connection object. It intercepts
 * events that are of interest of the ActiveMQManagedConnection.
- *
+ *
 * @version $Revision$
 */
public class ManagedConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener {

    private ActiveMQManagedConnection managedConnection;
- private ArrayList<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>(); + private final List<ManagedSessionProxy> sessions = new ArrayList<ManagedSessionProxy>();
    private ExceptionListener exceptionListener;

public ManagedConnectionProxy(ActiveMQManagedConnection managedConnection) {
@@ -58,7 +57,7 @@
    /**
* Used to let the ActiveMQManagedConnection that this connection handel is
     * not needed by the app.
-     *
+     *
     * @throws JMSException
     */
    public void close() throws JMSException {
@@ -73,18 +72,19 @@
    public void cleanup() {
        exceptionListener = null;
        managedConnection = null;
- for (Iterator<ManagedSessionProxy> iter = sessions.iterator(); iter.hasNext();) {
-            ManagedSessionProxy p = iter.next();
-            try {
-                p.cleanup();
-            } catch (JMSException ignore) {
+        synchronized (sessions) {
+            for (ManagedSessionProxy p : sessions) {
+                try {
+ //TODO is this dangerous? should we copy the list before iterating?
+                    p.cleanup();
+                } catch (JMSException ignore) {
+                }
            }
-            iter.remove();
+            sessions.clear();
        }
    }

    /**
-     *
* @return "physical" underlying activemq connection, if proxy is associated with a managed connection
     * @throws javax.jms.JMSException if managed connection is null
     */
@@ -96,7 +96,7 @@
    }

    /**
-     * @param transacted Whether session is transacted
+     * @param transacted      Whether session is transacted
     * @param acknowledgeMode session acknowledge mode
     * @return session proxy
     * @throws JMSException on error
@@ -106,7 +106,7 @@
    }

    /**
-     * @param transacted Whether session is transacted
+     * @param transacted      Whether session is transacted
     * @param acknowledgeMode session acknowledge mode
     * @return session proxy
     * @throws JMSException on error
@@ -115,24 +115,33 @@
if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) {
            acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
        }
-// ActiveMQSession session = (ActiveMQSession)getConnection().createSession(true, acknowledgeMode); - ActiveMQSession session = (ActiveMQSession)getConnection().createSession(transacted, acknowledgeMode); + ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode); ManagedTransactionContext txContext = new ManagedTransactionContext(managedConnection.getTransactionContext());
        session.setTransactionContext(txContext);
-        ManagedSessionProxy p = new ManagedSessionProxy(session);
+ ManagedSessionProxy p = new ManagedSessionProxy(session, this);
        p.setUseSharedTxContext(managedConnection.isInManagedTx());
-        sessions.add(p);
+        synchronized (sessions) {
+            sessions.add(p);
+        }
        return p;
    }

+    protected void sessionClosed(ManagedSessionProxy session) {
+        synchronized (sessions) {
+            sessions.remove(session);
+        }
+    }
+
public void setUseSharedTxContext(boolean enable) throws JMSException {
-        for (ManagedSessionProxy p : sessions) {
-            p.setUseSharedTxContext(enable);
+        synchronized (sessions) {
+            for (ManagedSessionProxy p : sessions) {
+                p.setUseSharedTxContext(enable);
+            }
        }
    }

    /**
-     * @param transacted Whether session is transacted
+     * @param transacted      Whether session is transacted
     * @param acknowledgeMode session acknowledge mode
     * @return session proxy
     * @throws JMSException on error
@@ -142,7 +151,7 @@
    }

    /**
-     * @param transacted Whether session is transacted
+     * @param transacted      Whether session is transacted
     * @param acknowledgeMode session acknowledge mode
     * @return session proxy
     * @throws JMSException on error
@@ -152,7 +161,7 @@
    }

    /**
-     * @return
+     * @return client id from delegate
     * @throws JMSException
     */
    public String getClientID() throws JMSException {
@@ -160,7 +169,7 @@
    }

    /**
-     * @return
+     * @return exception listener from delegate
     * @throws JMSException
     */
public ExceptionListener getExceptionListener() throws JMSException {
@@ -168,7 +177,7 @@
    }

    /**
-     * @return
+     * @return connection metadata from delegate
     * @throws JMSException
     */
    public ConnectionMetaData getMetaData() throws JMSException {
@@ -176,7 +185,8 @@
    }

    /**
-     * @param clientID
+     * Sets client id on delegate
+     * @param clientID new clientId
     * @throws JMSException
     */
    public void setClientID(String clientID) throws JMSException {
@@ -184,7 +194,8 @@
    }

    /**
-     * @param listener
+     * sets exception listener on delegate
+     * @param listener new listener
     * @throws JMSException
     */
public void setExceptionListener(ExceptionListener listener) throws JMSException {

Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/ activemq/ra/ManagedSessionProxy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedSessionProxy.java?rev=813992&r1=813991&r2=813992&view=diff
= = = = = = = = ====================================================================== --- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ ManagedSessionProxy.java (original) +++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ ManagedSessionProxy.java Fri Sep 11 19:31:11 2009
@@ -47,7 +47,7 @@

/**
* Acts as a pass through proxy for a JMS Session object. It intercepts events
- * that are of interest of the ActiveMQManagedConnection.
+ * that are of interest of the ActiveMQManagedConnection. There is one proxy for each session.
 *
 * @version $Revision$
 */
@@ -55,9 +55,11 @@

    private final ActiveMQSession session;
    private boolean closed;
+    private ManagedConnectionProxy connectionProxy;

-    public ManagedSessionProxy(ActiveMQSession session) {
+ public ManagedSessionProxy(ActiveMQSession session, ManagedConnectionProxy connectionProxy) {
        this.session = session;
+        this.connectionProxy = connectionProxy;
    }

public void setUseSharedTxContext(boolean enable) throws JMSException {
@@ -70,14 +72,17 @@
     * @throws JMSException
     */
    public void close() throws JMSException {
+       if (closed) {
+               return;
+        }
        cleanup();
+        connectionProxy.sessionClosed(this);
    }

    /**
- * Called by the ActiveMQManagedConnection to invalidate this proxy.
+     * Called by the ManagedConnectionProxy to invalidate this proxy.
     *
-     * @throws JMSException
-     * @throws JMSException
+     * @throws JMSException if session proxy has a problem
     */
    public void cleanup() throws JMSException {
        closed = true;
@@ -85,7 +90,9 @@
    }

    /**
-     *
+     *
+     * @return underlying session, unless this proxy is closed
+     * @throws javax.jms.JMSException if session is closed
     */
    private Session getSession() throws JMSException {
        if (closed) {



Rob Davies
I work here: http://fusesource.com
My Blog: http://rajdavies.blogspot.com/
I'm writing this: http://www.manning.com/snyder/




Reply via email to