Author: gtully
Date: Mon Dec 15 10:48:51 2008
New Revision: 726764
URL: http://svn.apache.org/viewvc?rev=726764&view=rev
Log:
fix - AMQ-2034 - have close in XA transaction deferred to synchronisation after
completion, have rollback call beforeEnd to propagate acknowledgements; add a
bunch of tests
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Dec 15 10:48:51 2008
@@ -590,11 +590,27 @@
*/
public void close() throws JMSException {
if (!unconsumedMessages.isClosed()) {
- dispose();
- this.session.asyncSendPacket(info.createRemoveCommand());
+ if (session.isTransacted() &&
session.getTransactionContext().getTransactionId() != null) {
+ session.getTransactionContext().addSynchronization(new
Synchronization() {
+ public void afterCommit() throws Exception {
+ doClose();
+ }
+
+ public void afterRollback() throws Exception {
+ doClose();
+ }
+ });
+ } else {
+ doClose();
+ }
}
}
+ void doClose() throws JMSException {
+ dispose();
+ this.session.asyncSendPacket(info.createRemoveCommand());
+ }
+
void clearMessagesInProgress() {
// we are called from inside the transport reconnection logic
// which involves us clearing all the connections' consumers
@@ -653,10 +669,14 @@
// }
// Do we have any acks we need to send out before closing?
- // Ack any delivered messages now. (session may still
- // commit/rollback the acks).
+ // Ack any delivered messages now.
// only processes optimized acknowledgements
- deliverAcks();
+ if (!session.isTransacted()) {
+ deliverAcks();
+ if (session.isDupsOkAcknowledge()) {
+ acknowledge();
+ }
+ }
if (executorService != null) {
executorService.shutdown();
try {
@@ -665,9 +685,7 @@
Thread.currentThread().interrupt();
}
}
- if (session.isTransacted() || session.isDupsOkAcknowledge()) {
- acknowledge();
- }
+
if (session.isClientAcknowledge()) {
if (!this.info.isBrowser()) {
// rollback duplicates that aren't acknowledged
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
Mon Dec 15 10:48:51 2008
@@ -27,6 +27,7 @@
import javax.transaction.xa.XAResource;
import org.apache.activemq.command.SessionId;
+import org.apache.activemq.transaction.Synchronization;
/**
* The XASession interface extends the capability of Session by adding access
@@ -96,6 +97,24 @@
return new ActiveMQTopicSession(this);
}
+ @Override
+ public void close() throws JMSException {
+ if (getTransactionContext().isInXATransaction()) {
+ getTransactionContext().addSynchronization(new Synchronization() {
+ public void afterCommit() throws Exception {
+ doClose();
+ }
+
+ public void afterRollback() throws Exception {
+ doClose();
+ }
+ });
+ }
+ }
+
+ void doClose() throws JMSException {
+ super.close();
+ }
/**
* This is called before transacted work is done by
* the session. XA Work can only be done when this
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Mon Dec 15 10:48:51 2008
@@ -224,6 +224,7 @@
throw new TransactionInProgressException("Cannot rollback() if an
XA transaction is already in progress ");
}
+ beforeEnd();
if (transactionId != null) {
TransactionInfo info = new TransactionInfo(getConnectionId(),
transactionId, TransactionInfo.ROLLBACK);
this.transactionId = null;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
Mon Dec 15 10:48:51 2008
@@ -16,27 +16,39 @@
*/
package org.apache.activemq;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XASession;
import javax.jms.XATopicConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.transport.stomp.StompTransportFilter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
private static final Log LOG =
LogFactory.getLog(ActiveMQXAConnectionFactoryTest.class);
+ long txGenerator = System.currentTimeMillis();
public void testCopy() throws URISyntaxException, JMSException {
ActiveMQXAConnectionFactory cf = new
ActiveMQXAConnectionFactory("vm://localhost?");
@@ -117,6 +129,126 @@
connection2.close();
}
+ public void testVanilaTransactionalProduceReceive() throws Exception {
+
+ ActiveMQXAConnectionFactory cf1 = new
ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+ XAConnection connection1 = (XAConnection)cf1.createConnection();
+ connection1.start();
+ XASession session = connection1.createXASession();
+ XAResource resource = session.getXAResource();
+ Destination dest = new ActiveMQQueue(getName());
+
+ // publish a message
+ Xid tid = createXid();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ MessageProducer producer = session.createProducer(dest);
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText(getName());
+ producer.send(message);
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+ session.close();
+
+ session = connection1.createXASession();
+ MessageConsumer consumer = session.createConsumer(dest);
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+ assertNotNull(receivedMessage);
+ assertEquals(getName(), receivedMessage.getText());
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+ }
+
+ public void testConsumerCloseTransactionalSendReceive() throws Exception {
+
+ ActiveMQXAConnectionFactory cf1 = new
ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+ XAConnection connection1 = (XAConnection)cf1.createConnection();
+ connection1.start();
+ XASession session = connection1.createXASession();
+ XAResource resource = session.getXAResource();
+ Destination dest = new ActiveMQQueue(getName());
+
+ // publish a message
+ Xid tid = createXid();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ MessageProducer producer = session.createProducer(dest);
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText(getName());
+ producer.send(message);
+ producer.close();
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+ session.close();
+
+ session = connection1.createXASession();
+ MessageConsumer consumer = session.createConsumer(dest);
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+ consumer.close();
+ assertNotNull(receivedMessage);
+ assertEquals(getName(), receivedMessage.getText());
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+
+ session = connection1.createXASession();
+ consumer = session.createConsumer(dest);
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ assertNull(consumer.receive(1000));
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+
+ }
+
+ public void testSessionCloseTransactionalSendReceive() throws Exception {
+
+ ActiveMQXAConnectionFactory cf1 = new
ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+ XAConnection connection1 = (XAConnection)cf1.createConnection();
+ connection1.start();
+ XASession session = connection1.createXASession();
+ XAResource resource = session.getXAResource();
+ Destination dest = new ActiveMQQueue(getName());
+
+ // publish a message
+ Xid tid = createXid();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ MessageProducer producer = session.createProducer(dest);
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText(getName());
+ producer.send(message);
+ session.close();
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+
+
+ session = connection1.createXASession();
+ MessageConsumer consumer = session.createConsumer(dest);
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+ session.close();
+ assertNotNull(receivedMessage);
+ assertEquals(getName(), receivedMessage.getText());
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+
+ session = connection1.createXASession();
+ consumer = session.createConsumer(dest);
+ tid = createXid();
+ resource = session.getXAResource();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ assertNull(consumer.receive(1000));
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.commit(tid, true);
+ }
+
+
protected void assertCreateConnection(String uri) throws Exception {
// Start up a broker with a tcp connector.
BrokerService broker = new BrokerService();
@@ -161,5 +293,29 @@
assertTrue("Should be an XATopicConnection", connection instanceof
XATopicConnection);
assertTrue("Should be an XAQueueConnection", connection instanceof
XAQueueConnection);
}
+
+ public Xid createXid() throws IOException {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ os.writeLong(++txGenerator);
+ os.close();
+ final byte[] bs = baos.toByteArray();
+
+ return new Xid() {
+ public int getFormatId() {
+ return 86;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return bs;
+ }
+
+ public byte[] getBranchQualifier() {
+ return bs;
+ }
+ };
+
+ }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
Mon Dec 15 10:48:51 2008
@@ -652,5 +652,35 @@
assertNull(redispatchConsumer.receive(500));
redispatchSession.close();
}
+
+ public void testRedispatchOfRolledbackTx() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ destination = createDestination(session,
ActiveMQDestination.QUEUE_TYPE);
+
+ sendMessages(connection, destination, 1);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ assertNotNull(consumer.receive(1000));
+
+ // install another consumer while message dispatch is
unacked/uncommitted
+ Session redispatchSession = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer redispatchConsumer =
redispatchSession.createConsumer(destination);
+
+ session.rollback();
+ session.close();
+
+ Message msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ // should have re-delivery of 2, one for re-dispatch, one for rollback
which is a little too much!
+ assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
+ redispatchSession.commit();
+
+ assertNull(redispatchConsumer.receive(500));
+ redispatchSession.close();
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
Mon Dec 15 10:48:51 2008
@@ -157,7 +157,8 @@
// Get the first.
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
-
+ session.commit();
+
QueueBrowser browser = session.createBrowser((Queue)destination);
Enumeration enumeration = browser.getEnumeration();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
Mon Dec 15 10:48:51 2008
@@ -37,7 +37,7 @@
protected int numberOfMessagesOnQueue = 1;
private Connection connection;
- public void testVerifyCloseRedeliveryWithFailoverTransport() throws
Throwable {
+ public void testVerifySessionCloseRedeliveryWithFailoverTransport() throws
Throwable {
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(destination);
@@ -57,7 +57,46 @@
assertEquals("redelivered message", id, message.getJMSMessageID());
assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
}
+
+ public void
testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws
Throwable {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ Message message = consumer.receive(1000);
+ String id = message.getJMSMessageID();
+ assertNotNull(message);
+ LOG.info("got message " + message);
+ consumer.close();
+ session.close();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ consumer = session.createConsumer(destination);
+
+ message = consumer.receive(1000);
+ session.commit();
+ assertNotNull(message);
+ assertEquals("redelivered message", id, message.getJMSMessageID());
+ assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+ }
+ public void
testVerifyConsumerCloseSessionRollbackRedeliveryWithFailoverTransport() throws
Throwable {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ Message message = consumer.receive(1000);
+ String id = message.getJMSMessageID();
+ assertNotNull(message);
+ LOG.info("got message " + message);
+ consumer.close();
+ session.rollback();
+
+ consumer = session.createConsumer(destination);
+ message = consumer.receive(1000);
+ session.commit();
+ assertNotNull(message);
+ assertEquals("redelivered message", id, message.getJMSMessageID());
+ assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+ }
+
protected void setUp() throws Exception {
super.setUp();
Modified:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
---
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
(original)
+++
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
Mon Dec 15 10:48:51 2008
@@ -25,7 +25,9 @@
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -188,6 +190,94 @@
}
+ public void testMessageExceptionReDelivery() throws Exception {
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+ adapter.setServerUrl("vm://localhost?broker.persistent=false");
+ adapter.start(new StubBootstrapContext());
+
+ final CountDownLatch messageDelivered = new CountDownLatch(2);
+
+ final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+ public void onMessage(Message message) {
+ super.onMessage(message);
+ try {
+ messageDelivered.countDown();
+ if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
+ throw new RuntimeException(getName() + " ex on first
delivery");
+ } else {
+ try {
+ assertTrue(message.getJMSRedelivered());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (InterruptedException ignored) {
+ }
+ };
+
+ public void afterDelivery() throws ResourceException {
+ try {
+ if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
+ xaresource.end(xid, XAResource.TMFAIL);
+ xaresource.rollback(xid);
+ } else {
+ xaresource.end(xid, XAResource.TMSUCCESS);
+ xaresource.prepare(xid);
+ xaresource.commit(xid, false);
+ }
+ } catch (Throwable e) {
+ throw new ResourceException(e);
+ }
+ }
+ };
+
+ ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+ activationSpec.setDestinationType(Queue.class.getName());
+ activationSpec.setDestination("TEST");
+ activationSpec.setResourceAdapter(adapter);
+ activationSpec.validate();
+
+ MessageEndpointFactory messageEndpointFactory = new
MessageEndpointFactory() {
+ public MessageEndpoint createEndpoint(XAResource resource) throws
UnavailableException {
+ endpoint.xaresource = resource;
+ return endpoint;
+ }
+
+ public boolean isDeliveryTransacted(Method method) throws
NoSuchMethodException {
+ return true;
+ }
+ };
+
+ // Activate an Endpoint
+ adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+ // Give endpoint a chance to setup and register its listeners
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+
+ }
+
+ // Send the broker a message to that endpoint
+ MessageProducer producer = session.createProducer(new
ActiveMQQueue("TEST"));
+ producer.send(session.createTextMessage("Hello!"));
+ connection.close();
+
+ // Wait for the message to be delivered twice.
+ assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+
+ // Shut the Endpoint down.
+ adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+ adapter.stop();
+
+ }
+
+
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);