Author: djencks
Date: Wed Jan 7 12:57:56 2009
New Revision: 732489
URL: http://svn.apache.org/viewvc?rev=732489&view=rev
Log:
AMQ-2034 move delay-session-close-in-xa-tx code to plain session so it works in
managed environments. Also prevent duplicate synchronization registrations
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=732489&r1=732488&r2=732489&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Wed Jan 7 12:57:56 2009
@@ -139,7 +139,7 @@
* acknowledges all messages consumed by a session at when acknowledge()
* is called
*/
- public static final int INDIVIDUAL_ACKNOWLEDGE=4;
+ public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
public static interface DeliveryListener {
void beforeDelivery(ActiveMQSession session, Message msg);
@@ -163,6 +163,7 @@
protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers =
new CopyOnWriteArrayList<ActiveMQMessageProducer>();
protected boolean closed;
+ private volatile boolean synchronizationRegistered;
protected boolean asyncDispatch;
protected boolean sessionAsyncDispatch;
protected final boolean debug;
@@ -553,11 +554,34 @@
*/
public void close() throws JMSException {
if (!closed) {
- dispose();
- connection.asyncSendPacket(info.createRemoveCommand());
+ if (getTransacted()) {
+ if (!synchronizationRegistered) {
+ synchronizationRegistered = true;
+ getTransactionContext().addSynchronization(new
Synchronization() {
+
+ public void afterCommit() throws
Exception {
+ doClose();
+ synchronizationRegistered = false;
+ }
+
+ public void afterRollback() throws
Exception {
+ doClose();
+ synchronizationRegistered = false;
+ }
+ });
+ }
+
+ } else {
+ doClose();
+ }
}
}
+ private void doClose() throws JMSException {
+ dispose();
+ connection.asyncSendPacket(info.createRemoveCommand());
+ }
+
void clearMessagesInProgress() {
executor.clearMessagesInProgress();
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator();
iter.hasNext();) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java?rev=732489&r1=732488&r2=732489&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
Wed Jan 7 12:57:56 2009
@@ -27,7 +27,6 @@
import javax.transaction.xa.XAResource;
import org.apache.activemq.command.SessionId;
-import org.apache.activemq.transaction.Synchronization;
/**
* The XASession interface extends the capability of Session by adding access
@@ -97,24 +96,6 @@
return new ActiveMQTopicSession(this);
}
- @Override
- public void close() throws JMSException {
- if (getTransactionContext().isInXATransaction()) {
- getTransactionContext().addSynchronization(new Synchronization() {
- public void afterCommit() throws Exception {
- doClose();
- }
-
- public void afterRollback() throws Exception {
- doClose();
- }
- });
- }
- }
-
- void doClose() throws JMSException {
- super.close();
- }
/**
* This is called before transacted work is done by
* the session. XA Work can only be done when this