Author: djencks
Date: Mon Jan 19 18:06:37 2009
New Revision: 735912
URL: http://svn.apache.org/viewvc?rev=735912&view=rev
Log:
AMQ-2078 extend transaction tests to xa in ra
Added:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java?rev=735912&r1=735911&r2=735912&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
Mon Jan 19 18:06:37 2009
@@ -56,18 +56,21 @@
Message[] outbound = new Message[] {session.createTextMessage("First
Message"), session.createTextMessage("Second Message")};
// lets consume any outstanding messages from previous test runs
+ beginTx();
while (consumer.receive(1000) != null) {
}
- session.commit();
+ commitTx();
+ beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
- session.commit();
+ commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
Message message = consumer.receive(1000);
assertEquals(outbound[0], message);
@@ -80,6 +83,7 @@
// Consume again.. the previous message should
// get redelivered.
+ beginTx();
message = consumer.receive(5000);
assertNotNull("Should have re-received the first message again!",
message);
messages.add(message);
@@ -89,7 +93,7 @@
assertNotNull("Should have re-received the second message again!",
message);
messages.add(message);
assertEquals(outbound[1], message);
- session.commit();
+ commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
@@ -111,24 +115,28 @@
// Session that sends messages
{
Session session = resourceProvider.createSession(connection);
+ this.session = session;
MessageProducer producer =
resourceProvider.createProducer(session, destination);
// consumer = resourceProvider.createConsumer(session,
// destination);
+ beginTx();
producer.send(session.createTextMessage("Test Message: " + i));
- session.commit();
+ commitTx();
session.close();
}
// Session that consumes messages
{
Session session = resourceProvider.createSession(connection);
+ this.session = session;
MessageConsumer consumer =
resourceProvider.createConsumer(session, destination);
+ beginTx();
TextMessage message = (TextMessage)consumer.receive(1000 * 5);
assertNotNull("Received only " + i + " messages in batch ",
message);
assertEquals("Test Message: " + i, message.getText());
- session.commit();
+ commitTx();
session.close();
}
}
@@ -145,20 +153,24 @@
Message[] outbound = new Message[] {session.createTextMessage("First
Message"), session.createTextMessage("Second Message"),
session.createTextMessage("Third Message")};
// lets consume any outstanding messages from previous test runs
+ beginTx();
while (consumer.receive(1000) != null) {
}
- session.commit();
+ commitTx();
+ beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
producer.send(outbound[2]);
- session.commit();
+ commitTx();
// Get the first.
+ beginTx();
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
- session.commit();
+ commitTx();
+ beginTx();
QueueBrowser browser = session.createBrowser((Queue)destination);
Enumeration enumeration = browser.getEnumeration();
@@ -187,7 +199,7 @@
assertEquals(outbound[2], consumer.receive(1000));
consumer.close();
- session.commit();
+ commitTx();
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java?rev=735912&r1=735911&r2=735912&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
Mon Jan 19 18:06:37 2009
@@ -85,11 +85,31 @@
resourceProvider = getJmsResourceProvider();
topic = resourceProvider.isTopic();
// We will be using transacted sessions.
- resourceProvider.setTransacted(true);
- connectionFactory = resourceProvider.createConnectionFactory();
+ setSessionTransacted();
+ connectionFactory = newConnectionFactory();
reconnect();
}
+ protected void setSessionTransacted() {
+ resourceProvider.setTransacted(true);
+ }
+
+ protected ConnectionFactory newConnectionFactory() throws Exception {
+ return resourceProvider.createConnectionFactory();
+ }
+
+ protected void beginTx() throws Exception {
+ //no-op for local tx
+ }
+
+ protected void commitTx() throws Exception {
+ session.commit();
+ }
+
+ protected void rollbackTx() throws Exception {
+ session.rollback();
+ }
+
/**
*/
protected BrokerService createBroker() throws Exception,
URISyntaxException {
@@ -124,24 +144,25 @@
public void testSendReceiveTransactedBatches() throws Exception {
TextMessage message = session.createTextMessage("Batch Message");
-
for (int j = 0; j < batchCount; j++) {
LOG.info("Producing bacth " + j + " of " + batchSize + "
messages");
+ beginTx();
for (int i = 0; i < batchSize; i++) {
producer.send(message);
}
messageSent();
- session.commit();
+ commitTx();
LOG.info("Consuming bacth " + j + " of " + batchSize + "
messages");
+ beginTx();
for (int i = 0; i < batchSize; i++) {
message = (TextMessage)consumer.receive(1000 * 5);
assertNotNull("Received only " + i + " messages in batch " +
j, message);
assertEquals("Batch Message", message.getText());
}
- session.commit();
+ commitTx();
}
}
@@ -158,18 +179,22 @@
Message[] outbound = new Message[] {session.createTextMessage("First
Message"), session.createTextMessage("Second Message")};
// sends a message
+ beginTx();
producer.send(outbound[0]);
- session.commit();
+ commitTx();
// sends a message that gets rollbacked
+ beginTx();
producer.send(session.createTextMessage("I'm going to get rolled
back."));
- session.rollback();
+ rollbackTx();
// sends a message
+ beginTx();
producer.send(outbound[1]);
- session.commit();
+ commitTx();
// receives the first message
+ beginTx();
ArrayList<Message> messages = new ArrayList<Message>();
LOG.info("About to consume message 1");
Message message = consumer.receive(1000);
@@ -183,26 +208,58 @@
LOG.info("Received: " + message);
// validates that the rollbacked was not consumed
- session.commit();
+ commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
}
/**
+ * spec section 3.6 acking a message with automation acks has no effect.
+ * @throws Exception
+ */
+ public void testAckMessageInTx() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First
Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ outbound[0].acknowledge();
+ commitTx();
+ outbound[0].acknowledge();
+
+ // receives the first message
+ beginTx();
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Message not delivered.", outbound, inbound);
+ }
+
+ /**
* Sends a batch of messages and validates that the message sent before
* session close is not consumed.
- *
+ *
+ * This test only works with local transactions, not xa.
* @throws Exception
*/
public void testSendSessionClose() throws Exception {
Message[] outbound = new Message[] {session.createTextMessage("First
Message"), session.createTextMessage("Second Message")};
// sends a message
+ beginTx();
producer.send(outbound[0]);
- session.commit();
+ commitTx();
// sends a message that gets rollbacked
+ beginTx();
producer.send(session.createTextMessage("I'm going to get rolled
back."));
consumer.close();
@@ -210,11 +267,12 @@
// sends a message
producer.send(outbound[1]);
- session.commit();
+ commitTx();
// receives the first message
ArrayList<Message> messages = new ArrayList<Message>();
LOG.info("About to consume message 1");
+ beginTx();
Message message = consumer.receive(1000);
messages.add(message);
LOG.info("Received: " + message);
@@ -226,7 +284,7 @@
LOG.info("Received: " + message);
// validates that the rollbacked was not consumed
- session.commit();
+ commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
@@ -242,10 +300,12 @@
Message[] outbound = new Message[] {session.createTextMessage("First
Message"), session.createTextMessage("Second Message")};
// sends a message
+ beginTx();
producer.send(outbound[0]);
- session.commit();
+ commitTx();
// sends a message that gets rollbacked
+ beginTx();
producer.send(session.createTextMessage("I'm going to get rolled
back."));
consumer.close();
session.close();
@@ -253,12 +313,14 @@
reconnect();
// sends a message
+ beginTx();
producer.send(outbound[1]);
- session.commit();
+ commitTx();
// receives the first message
ArrayList<Message> messages = new ArrayList<Message>();
LOG.info("About to consume message 1");
+ beginTx();
Message message = consumer.receive(1000);
messages.add(message);
LOG.info("Received: " + message);
@@ -270,7 +332,7 @@
LOG.info("Received: " + message);
// validates that the rollbacked was not consumed
- session.commit();
+ commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
@@ -286,36 +348,41 @@
Message[] outbound = new Message[] {session.createTextMessage("First
Message"), session.createTextMessage("Second Message")};
// lets consume any outstanding messages from prev test runs
+ beginTx();
while (consumer.receive(1000) != null) {
}
- session.commit();
+ commitTx();
// sent both messages
+ beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
- session.commit();
+ commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
Message message = consumer.receive(1000);
messages.add(message);
assertEquals(outbound[0], message);
- session.commit();
+ commitTx();
// rollback so we can get that last message again.
+ beginTx();
message = consumer.receive(1000);
assertNotNull(message);
assertEquals(outbound[1], message);
- session.rollback();
+ rollbackTx();
// Consume again.. the prev message should
// get redelivered.
+ beginTx();
message = consumer.receive(5000);
assertNotNull("Should have re-received the message again!", message);
messages.add(message);
- session.commit();
+ commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
@@ -332,29 +399,33 @@
Message[] outbound = new Message[] {session.createTextMessage("First
Message"), session.createTextMessage("Second Message")};
// lets consume any outstanding messages from prev test runs
+ beginTx();
while (consumer.receive(1000) != null) {
}
- session.commit();
+ commitTx();
//
+ beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
- session.commit();
+ commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
Message message = consumer.receive(1000);
assertEquals(outbound[0], message);
message = consumer.receive(1000);
assertNotNull(message);
assertEquals(outbound[1], message);
- session.rollback();
+ rollbackTx();
// Consume again.. the prev message should
// get redelivered.
+ beginTx();
message = consumer.receive(5000);
assertNotNull("Should have re-received the first message again!",
message);
messages.add(message);
@@ -365,7 +436,7 @@
assertEquals(outbound[1], message);
assertNull(consumer.receiveNoWait());
- session.commit();
+ commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
@@ -383,13 +454,15 @@
Message[] outbound = new Message[] {session.createTextMessage("First
Message"), session.createTextMessage("Second Message"),
session.createTextMessage("Third Message"),
session.createTextMessage("Fourth
Message")};
+ beginTx();
for (int i = 0; i < outbound.length; i++) {
// sends a message
producer.send(outbound[i]);
}
- session.commit();
+ commitTx();
// receives the first message
+ beginTx();
for (int i = 0; i < outbound.length; i++) {
LOG.info("About to consume message 1");
Message message = consumer.receive(1000);
@@ -398,7 +471,7 @@
}
// validates that the rollbacked was not consumed
- session.commit();
+ commitTx();
}
/**
@@ -446,33 +519,37 @@
TextMessage[] outbound = new TextMessage[]
{session.createTextMessage("First Message"), session.createTextMessage("Second
Message")};
// lets consume any outstanding messages from prev test runs
+ beginTx();
while (consumer.receiveNoWait() != null) {
}
- session.commit();
+ commitTx();
// sends the messages
+ beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
- session.commit();
+ commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
+ beginTx();
TextMessage message = (TextMessage)consumer.receive(1000);
assertEquals(outbound[0].getText(), message.getText());
// Close the consumer before the commit. This should not cause the
// received message
// to rollback.
consumer.close();
- session.commit();
+ commitTx();
// Create a new consumer
consumer = resourceProvider.createConsumer(session, destination);
LOG.info("Created consumer: " + consumer);
+ beginTx();
message = (TextMessage)consumer.receive(1000);
assertEquals(outbound[1].getText(), message.getText());
- session.commit();
+ commitTx();
}
public void testChangeMutableObjectInObjectMessageThenRollback() throws
Exception {
@@ -481,10 +558,12 @@
Message outbound = session.createObjectMessage(list);
outbound.setStringProperty("foo", "abc");
+ beginTx();
producer.send(outbound);
- session.commit();
+ commitTx();
LOG.info("About to consume message 1");
+ beginTx();
Message message = consumer.receive(5000);
List<String> body = assertReceivedObjectMessageWithListBody(message);
@@ -498,12 +577,13 @@
}
body.clear();
body.add("This should never be seen!");
- session.rollback();
+ rollbackTx();
+ beginTx();
message = consumer.receive(5000);
List<String> secondBody =
assertReceivedObjectMessageWithListBody(message);
assertNotSame("Second call should return a different body",
secondBody, body);
- session.commit();
+ commitTx();
}
@SuppressWarnings("unchecked")
@@ -526,7 +606,7 @@
*
* @throws JMSException
*/
- protected void reconnect() throws JMSException {
+ protected void reconnect() throws Exception {
if (connection != null) {
// Close the prev connection.
@@ -558,19 +638,24 @@
* Sets the prefeftch policy to one.
*/
protected void setPrefetchToOne() {
- ActiveMQPrefetchPolicy prefetchPolicy =
((ActiveMQConnection)connection).getPrefetchPolicy();
+ ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(1);
prefetchPolicy.setTopicPrefetch(1);
prefetchPolicy.setDurableTopicPrefetch(1);
prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
}
+ protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+ return ((ActiveMQConnection)connection).getPrefetchPolicy();
+ }
+
+ //This test won't work with xa tx so no beginTx() has been added.
public void testMessageListener() throws Exception {
// send messages
for (int i = 0; i < MESSAGE_COUNT; i++) {
producer.send(session.createTextMessage(MESSAGE_TEXT + i));
}
- session.commit();
+ commitTx();
consumer.setMessageListener(this);
// wait receive
waitReceiveUnack();
@@ -589,7 +674,7 @@
unackMessages.add(message);
if (unackMessages.size() == MESSAGE_COUNT) {
try {
- session.rollback();
+ rollbackTx();
resendPhase = true;
} catch (Exception e) {
e.printStackTrace();
@@ -599,7 +684,7 @@
ackMessages.add(message);
if (ackMessages.size() == MESSAGE_COUNT) {
try {
- session.commit();
+ commitTx();
} catch (Exception e) {
e.printStackTrace();
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java?rev=735912&r1=735911&r2=735912&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
Mon Jan 19 18:06:37 2009
@@ -197,6 +197,9 @@
*/
public void setTransacted(boolean transacted) {
this.transacted = transacted;
+ if (transacted) {
+ setAckMode(Session.SESSION_TRANSACTED);
+ }
}
/**
Added:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java?rev=735912&view=auto
==============================================================================
---
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
(added)
+++
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
Mon Jan 19 18:06:37 2009
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.activemq.ra;
+
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import javax.resource.spi.ManagedConnection;
+import javax.resource.ResourceException;
+
+import org.apache.activemq.*;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * @version $Rev:$ $Date:$
+ */
+public class JmsXAQueueTransactionTest extends JmsQueueTransactionTest {
+ private static final String DEFAULT_HOST = "vm://localhost";
+
+ private ConnectionManagerAdapter connectionManager = new
ConnectionManagerAdapter();
+ private ActiveMQManagedConnectionFactory managedConnectionFactory;
+ private XAResource xaResource;
+ private static long txGenerator;
+ private Xid xid;
+
+
+ @Override
+ protected void setSessionTransacted() {
+ resourceProvider.setTransacted(false);
+ resourceProvider.setAckMode(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Override
+ protected ConnectionFactory newConnectionFactory() throws Exception {
+ managedConnectionFactory = new ActiveMQManagedConnectionFactory();
+ managedConnectionFactory.setServerUrl(DEFAULT_HOST);
+
managedConnectionFactory.setUserName(org.apache.activemq.ActiveMQConnectionFactory.DEFAULT_USER);
+
managedConnectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
+
+ return
(ConnectionFactory)managedConnectionFactory.createConnectionFactory(connectionManager);
+ }
+
+
+ /**
+ * Recreates the connection.
+ *
+ * @throws javax.jms.JMSException
+ */
+ @Override
+ protected void reconnect() throws Exception {
+ super.reconnect();
+ ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection;
+ ManagedConnection mc = proxy.getManagedConnection();
+ xaResource = mc.getXAResource();
+ }
+
+ @Override
+ protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+ ManagedConnectionProxy proxy = (ManagedConnectionProxy) connection;
+ ActiveMQManagedConnection mc = proxy.getManagedConnection();
+ ActiveMQConnection conn = (ActiveMQConnection)
mc.getPhysicalConnection();
+ return conn.getPrefetchPolicy();
+ }
+
+ @Override
+ protected void beginTx() throws Exception {
+ xid = createXid();
+ xaResource.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ @Override
+ protected void commitTx() throws Exception {
+ xaResource.end(xid, XAResource.TMSUCCESS);
+ int result = xaResource.prepare(xid);
+ if (result == XAResource.XA_OK) {
+ xaResource.commit(xid, false);
+ }
+ xid = null;
+ }
+
+ @Override
+ protected void rollbackTx() throws Exception {
+ xaResource.end(xid, XAResource.TMSUCCESS);
+ xaResource.rollback(xid);
+ xid = null;
+ }
+
+ //This test won't work with xa tx it is overridden to do nothing here
+ @Override
+ public void testMessageListener() throws Exception {
+ }
+
+ /**
+ * Sends a batch of messages and validates that the message sent before
+ * session close is not consumed.
+ * <p/>
+ * This test only works with local transactions, not xa. so its commented
out here
+ *
+ * @throws Exception
+ */
+ @Override
+ public void testSendSessionClose() throws Exception {
+ }
+
+ public Xid createXid() throws IOException {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream os = new DataOutputStream(baos);
+ os.writeLong(++txGenerator);
+ os.close();
+ final byte[] bs = baos.toByteArray();
+
+ return new Xid() {
+ public int getFormatId() {
+ return 86;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return bs;
+ }
+
+ public byte[] getBranchQualifier() {
+ return bs;
+ }
+ };
+
+ }
+
+}
Propchange:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/JmsXAQueueTransactionTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain