Author: djencks
Date: Fri Sep 11 19:31:11 2009
New Revision: 813992
URL: http://svn.apache.org/viewvc?rev=813992&view=rev
Log:
AMQ-2166 unregister session proxies when they are closed. Patch
(modified) from Mario Siegenthaler, slightly modified
Modified:
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/
ManagedConnectionProxy.java
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/
ManagedSessionProxy.java
Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/
activemq/ra/ManagedConnectionProxy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedConnectionProxy.java?rev=813992&r1=813991&r2=813992&view=diff
=
=
=
=
=
=
=
=
======================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/
ManagedConnectionProxy.java (original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/
ManagedConnectionProxy.java Fri Sep 11 19:31:11 2009
@@ -17,7 +17,7 @@
package org.apache.activemq.ra;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
@@ -34,7 +34,6 @@
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
-
import org.apache.activemq.ActiveMQQueueSession;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicSession;
@@ -42,13 +41,13 @@
/**
* Acts as a pass through proxy for a JMS Connection object. It
intercepts
* events that are of interest of the ActiveMQManagedConnection.
- *
+ *
* @version $Revision$
*/
public class ManagedConnectionProxy implements Connection,
QueueConnection, TopicConnection, ExceptionListener {
private ActiveMQManagedConnection managedConnection;
- private ArrayList<ManagedSessionProxy> sessions = new
ArrayList<ManagedSessionProxy>();
+ private final List<ManagedSessionProxy> sessions = new
ArrayList<ManagedSessionProxy>();
private ExceptionListener exceptionListener;
public ManagedConnectionProxy(ActiveMQManagedConnection
managedConnection) {
@@ -58,7 +57,7 @@
/**
* Used to let the ActiveMQManagedConnection that this
connection handel is
* not needed by the app.
- *
+ *
* @throws JMSException
*/
public void close() throws JMSException {
@@ -73,18 +72,19 @@
public void cleanup() {
exceptionListener = null;
managedConnection = null;
- for (Iterator<ManagedSessionProxy> iter =
sessions.iterator(); iter.hasNext();) {
- ManagedSessionProxy p = iter.next();
- try {
- p.cleanup();
- } catch (JMSException ignore) {
+ synchronized (sessions) {
+ for (ManagedSessionProxy p : sessions) {
+ try {
+ //TODO is this dangerous? should we copy the
list before iterating?
+ p.cleanup();
+ } catch (JMSException ignore) {
+ }
}
- iter.remove();
+ sessions.clear();
}
}
/**
- *
* @return "physical" underlying activemq connection, if proxy
is associated with a managed connection
* @throws javax.jms.JMSException if managed connection is null
*/
@@ -96,7 +96,7 @@
}
/**
- * @param transacted Whether session is transacted
+ * @param transacted Whether session is transacted
* @param acknowledgeMode session acknowledge mode
* @return session proxy
* @throws JMSException on error
@@ -106,7 +106,7 @@
}
/**
- * @param transacted Whether session is transacted
+ * @param transacted Whether session is transacted
* @param acknowledgeMode session acknowledge mode
* @return session proxy
* @throws JMSException on error
@@ -115,24 +115,33 @@
if (!transacted && acknowledgeMode ==
Session.SESSION_TRANSACTED) {
acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
}
-// ActiveMQSession session =
(ActiveMQSession)getConnection().createSession(true, acknowledgeMode);
- ActiveMQSession session =
(ActiveMQSession)getConnection().createSession(transacted,
acknowledgeMode);
+ ActiveMQSession session = (ActiveMQSession)
getConnection().createSession(transacted, acknowledgeMode);
ManagedTransactionContext txContext = new
ManagedTransactionContext(managedConnection.getTransactionContext());
session.setTransactionContext(txContext);
- ManagedSessionProxy p = new ManagedSessionProxy(session);
+ ManagedSessionProxy p = new ManagedSessionProxy(session,
this);
p.setUseSharedTxContext(managedConnection.isInManagedTx());
- sessions.add(p);
+ synchronized (sessions) {
+ sessions.add(p);
+ }
return p;
}
+ protected void sessionClosed(ManagedSessionProxy session) {
+ synchronized (sessions) {
+ sessions.remove(session);
+ }
+ }
+
public void setUseSharedTxContext(boolean enable) throws
JMSException {
- for (ManagedSessionProxy p : sessions) {
- p.setUseSharedTxContext(enable);
+ synchronized (sessions) {
+ for (ManagedSessionProxy p : sessions) {
+ p.setUseSharedTxContext(enable);
+ }
}
}
/**
- * @param transacted Whether session is transacted
+ * @param transacted Whether session is transacted
* @param acknowledgeMode session acknowledge mode
* @return session proxy
* @throws JMSException on error
@@ -142,7 +151,7 @@
}
/**
- * @param transacted Whether session is transacted
+ * @param transacted Whether session is transacted
* @param acknowledgeMode session acknowledge mode
* @return session proxy
* @throws JMSException on error
@@ -152,7 +161,7 @@
}
/**
- * @return
+ * @return client id from delegate
* @throws JMSException
*/
public String getClientID() throws JMSException {
@@ -160,7 +169,7 @@
}
/**
- * @return
+ * @return exception listener from delegate
* @throws JMSException
*/
public ExceptionListener getExceptionListener() throws
JMSException {
@@ -168,7 +177,7 @@
}
/**
- * @return
+ * @return connection metadata from delegate
* @throws JMSException
*/
public ConnectionMetaData getMetaData() throws JMSException {
@@ -176,7 +185,8 @@
}
/**
- * @param clientID
+ * Sets client id on delegate
+ * @param clientID new clientId
* @throws JMSException
*/
public void setClientID(String clientID) throws JMSException {
@@ -184,7 +194,8 @@
}
/**
- * @param listener
+ * sets exception listener on delegate
+ * @param listener new listener
* @throws JMSException
*/
public void setExceptionListener(ExceptionListener listener)
throws JMSException {
Modified: activemq/trunk/activemq-ra/src/main/java/org/apache/
activemq/ra/ManagedSessionProxy.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ManagedSessionProxy.java?rev=813992&r1=813991&r2=813992&view=diff
=
=
=
=
=
=
=
=
======================================================================
--- activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/
ManagedSessionProxy.java (original)
+++ activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/
ManagedSessionProxy.java Fri Sep 11 19:31:11 2009
@@ -47,7 +47,7 @@
/**
* Acts as a pass through proxy for a JMS Session object. It
intercepts events
- * that are of interest of the ActiveMQManagedConnection.
+ * that are of interest of the ActiveMQManagedConnection. There is
one proxy for each session.
*
* @version $Revision$
*/
@@ -55,9 +55,11 @@
private final ActiveMQSession session;
private boolean closed;
+ private ManagedConnectionProxy connectionProxy;
- public ManagedSessionProxy(ActiveMQSession session) {
+ public ManagedSessionProxy(ActiveMQSession session,
ManagedConnectionProxy connectionProxy) {
this.session = session;
+ this.connectionProxy = connectionProxy;
}
public void setUseSharedTxContext(boolean enable) throws
JMSException {
@@ -70,14 +72,17 @@
* @throws JMSException
*/
public void close() throws JMSException {
+ if (closed) {
+ return;
+ }
cleanup();
+ connectionProxy.sessionClosed(this);
}
/**
- * Called by the ActiveMQManagedConnection to invalidate this
proxy.
+ * Called by the ManagedConnectionProxy to invalidate this proxy.
*
- * @throws JMSException
- * @throws JMSException
+ * @throws JMSException if session proxy has a problem
*/
public void cleanup() throws JMSException {
closed = true;
@@ -85,7 +90,9 @@
}
/**
- *
+ *
+ * @return underlying session, unless this proxy is closed
+ * @throws javax.jms.JMSException if session is closed
*/
private Session getSession() throws JMSException {
if (closed) {