Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=1345202&r1=1345201&r2=1345202&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Fri Jun 1 14:32:50 2012 @@ -30,6 +30,7 @@ import org.apache.activemq.store.ProxyTo import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.jdbc.JDBCMessageStore; import java.io.IOException; import java.util.ArrayList; @@ -45,16 +46,16 @@ import java.util.concurrent.Future; */ public class MemoryTransactionStore implements TransactionStore { - ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); - ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>(); - final PersistenceAdapter persistenceAdapter; + protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); + protected ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>(); + protected final PersistenceAdapter persistenceAdapter; private boolean doingRecover; public class Tx { - private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); + public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>(); - private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); + public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>(); public void add(AddMessageCommand msg) { messages.add(msg); @@ -114,6 +115,8 @@ public class MemoryTransactionStore impl public interface AddMessageCommand { Message getMessage(); + MessageStore getMessageStore(); + void run(ConnectionContext context) throws IOException; } @@ -121,6 +124,8 @@ public class MemoryTransactionStore impl MessageAck getMessageAck(); void run(ConnectionContext context) throws IOException; + + MessageStore getMessageStore(); } public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { @@ -164,7 +169,7 @@ public class MemoryTransactionStore impl } public TopicMessageStore proxy(TopicMessageStore messageStore) { - return new ProxyTopicMessageStore(messageStore) { + ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) { @Override public void addMessage(ConnectionContext context, final Message send) throws IOException { MemoryTransactionStore.this.addMessage(getDelegate(), send); @@ -204,12 +209,17 @@ public class MemoryTransactionStore impl subscriptionName, messageId, ack); } }; + onProxyTopicStore(proxyTopicMessageStore); + return proxyTopicMessageStore; + } + + protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) { } /** * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) */ - public void prepare(TransactionId txid) { + public void prepare(TransactionId txid) throws IOException { Tx tx = inflightTransactions.remove(txid); if (tx == null) { return; @@ -226,6 +236,15 @@ public class MemoryTransactionStore impl return tx; } + public Tx getPreparedTx(TransactionId txid) { + Tx tx = preparedTransactions.get(txid); + if (tx == null) { + tx = new Tx(); + preparedTransactions.put(txid, tx); + } + return tx; + } + public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { if (preCommit != null) { preCommit.run(); @@ -248,7 +267,7 @@ public class MemoryTransactionStore impl /** * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) */ - public void rollback(TransactionId txid) { + public void rollback(TransactionId txid) throws IOException { preparedTransactions.remove(txid); inflightTransactions.remove(txid); } @@ -268,12 +287,16 @@ public class MemoryTransactionStore impl Object txid = iter.next(); Tx tx = preparedTransactions.get(txid); listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); + onRecovered(tx); } } finally { this.doingRecover = false; } } + protected void onRecovered(Tx tx) { + } + /** * @param message * @throws IOException @@ -291,6 +314,11 @@ public class MemoryTransactionStore impl return message; } + @Override + public MessageStore getMessageStore() { + return destination; + } + public void run(ConnectionContext ctx) throws IOException { destination.addMessage(ctx, message); } @@ -320,13 +348,18 @@ public class MemoryTransactionStore impl public void run(ConnectionContext ctx) throws IOException { destination.removeMessage(ctx, ack); } + + @Override + public MessageStore getMessageStore() { + return destination; + } }); } else { destination.removeMessage(null, ack); } } - final void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, + public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, final MessageId messageId, final MessageAck ack) throws IOException { if (doingRecover) { return; @@ -342,6 +375,11 @@ public class MemoryTransactionStore impl public void run(ConnectionContext ctx) throws IOException { destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); } + + @Override + public MessageStore getMessageStore() { + return destination; + } }); } else { destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java?rev=1345202&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java Fri Jun 1 14:32:50 2012 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.apache.derby.jdbc.EmbeddedXADataSource; + +public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest { + + EmbeddedXADataSource dataSource; + + @Override + protected void setUp() throws Exception { + dataSource = new EmbeddedXADataSource(); + dataSource.setDatabaseName("derbyDb"); + dataSource.setCreateDatabase("create"); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + stopDerby(); + } + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + jdbc.setDataSource(dataSource); + broker.setPersistenceAdapter(jdbc); + } + + @Override + protected void restartBroker() throws Exception { + broker.stop(); + stopDerby(); + dataSource = new EmbeddedXADataSource(); + dataSource.setDatabaseName("derbyDb"); + dataSource.setCreateDatabase("create"); + + broker = createRestartedBroker(); + broker.start(); + } + + private void stopDerby() { + LOG.info("STOPPING DB!@!!!!"); + final EmbeddedDataSource ds = dataSource; + try { + ds.setShutdownDatabase("shutdown"); + ds.getConnection(); + } catch (Exception ignored) { + } + + } + + public static Test suite() { + return suite(JdbcXARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + @Override + protected ActiveMQDestination createDestination() { + return new ActiveMQQueue("test,special"); + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1345202&r1=1345201&r2=1345202&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Fri Jun 1 14:32:50 2012 @@ -23,9 +23,13 @@ import javax.management.MalformedObjectN import javax.management.ObjectName; import junit.framework.Test; +import org.apache.activemq.broker.jmx.DestinationView; +import org.apache.activemq.broker.jmx.DestinationViewMBean; import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DataArrayResponse; @@ -37,6 +41,7 @@ import org.apache.activemq.command.Sessi import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.util.JMXSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +53,8 @@ import org.slf4j.LoggerFactory; */ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class); + public boolean prioritySupport = false; + public void testPreparedJmxView() throws Exception { ActiveMQDestination destination = createDestination(); @@ -96,6 +103,10 @@ public class XARecoveryBrokerTest extend dar = (DataArrayResponse)response; assertEquals(4, dar.getData().length); + // validate destination depth via jmx + DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]); + assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize()); + TransactionId first = (TransactionId)dar.getData()[0]; // via jmx, force outcome for (int i = 0; i < 4; i++) { @@ -131,6 +142,16 @@ public class XARecoveryBrokerTest extend return proxy; } + private DestinationViewMBean getProxyToDestination(ActiveMQDestination destination) throws MalformedObjectNameException, JMSException { + + ObjectName objectName = new ObjectName("org.apache.activemq:Type=" + (destination.isQueue() ? "Queue" : "Topic") + ",Destination=" + + JMXSupport.encodeObjectNamePart(destination.getPhysicalName()) + ",BrokerName=localhost"); + DestinationViewMBean proxy = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(objectName, + DestinationViewMBean.class, true); + return proxy; + + } + public void testPreparedTransactionRecoveredOnRestart() throws Exception { ActiveMQDestination destination = createDestination(); @@ -213,6 +234,94 @@ public class XARecoveryBrokerTest extend assertNoMessagesLeft(connection); } + public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception { + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse) response; + assertEquals(4, dar.getData().length); + + // ensure we can close a connection with prepared transactions + connection.request(closeConnectionInfo(connectionInfo)); + + // open again to deliver outcome + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // Commit the prepared transactions. + for (int i = 0; i < dar.getData().length; i++) { + connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i])); + } + + // We should get the committed transactions. + for (int i = 0; i < expectedMessageCount(4, destination); i++) { + Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + + } + public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception { ActiveMQDestination destination = createDestination(); @@ -260,6 +369,55 @@ public class XARecoveryBrokerTest extend assertNoMessagesLeft(connection); } + public void testQueuePersistentCommited2PhaseMessagesNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + } + + // Commit 2 phase + connection.request(createPrepareTransaction(connectionInfo, txid)); + connection.send(createCommitTransaction2Phase(connectionInfo, txid)); + + connection.request(closeConnectionInfo(connectionInfo)); + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + for (int i = 0; i < expectedMessageCount(4, destination); i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + } + public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception { ActiveMQDestination destination = createDestination(); @@ -396,6 +554,90 @@ public class XARecoveryBrokerTest extend assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); } + public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() { + addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception { + // REVISIT for kahadb + if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) { + LOG.warn("only works on jdbc"); + return; + } + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // setup durable subs + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + final int messageCount = expectedMessageCount(4, destination); + Message m = null; + for (int i = 0; i < messageCount; i++) { + m = receiveMessage(connection); + assertNotNull("unexpected null on: " + i, m); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, m, messageCount, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // restart the broker. + restartBroker(); + + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // no redelivery, exactly once semantics unless there is rollback + m = receiveMessage(connection); + assertNull(m); + assertNoMessagesLeft(connection); + + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + } + public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { ActiveMQDestination destination = createDestination(); @@ -409,7 +651,8 @@ public class XARecoveryBrokerTest extend connection.send(sessionInfo); connection.send(producerInfo); - for (int i = 0; i < 4; i++) { + int numMessages = 4; + for (int i = 0; i < numMessages; i++) { Message message = createMessage(producerInfo, destination); message.setPersistent(true); connection.send(message); @@ -426,13 +669,13 @@ public class XARecoveryBrokerTest extend consumerInfo = createConsumerInfo(sessionInfo, dest); connection.send(consumerInfo); - for (int i = 0; i < 4; i++) { + for (int i = 0; i < numMessages; i++) { message = receiveMessage(connection); assertNotNull(message); } // one ack with last received, mimic a beforeEnd synchronization - MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE); + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); ack.setTransactionId(txid); connection.send(ack); } @@ -466,7 +709,7 @@ public class XARecoveryBrokerTest extend // rollback so we get redelivery connection.request(createRollbackTransaction(connectionInfo, txid)); - // Begin new transaction for redelivery + LOG.info("new tx for redelivery"); txid = createXATransaction(sessionInfo); connection.send(createBeginTransaction(connectionInfo, txid)); @@ -475,11 +718,11 @@ public class XARecoveryBrokerTest extend consumerInfo = createConsumerInfo(sessionInfo, dest); connection.send(consumerInfo); - for (int i = 0; i < 4; i++) { + for (int i = 0; i < numMessages; i++) { message = receiveMessage(connection); - assertNotNull(message); + assertNotNull("unexpected null on:" + i, message); } - MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE); + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); ack.setTransactionId(txid); connection.send(ack); } @@ -492,6 +735,180 @@ public class XARecoveryBrokerTest extend assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); } + public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() { + addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { + + // REVISIT for kahadb + if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) { + LOG.warn("only works on jdbc"); + return; + } + + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // setup durable subs + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + int numMessages = 4; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = null; + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // restart the broker. + restartBroker(); + + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // no redelivery, exactly once semantics while prepared + message = receiveMessage(connection); + assertNull(message); + assertNoMessagesLeft(connection); + + // rollback so we get redelivery + connection.request(createRollbackTransaction(connectionInfo, txid)); + + LOG.info("new tx for redelivery"); + txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull("unexpected null on:" + i, message); + } + ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + // Commit + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + } + + public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRollback() { + addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception { + + // REVISIT for kahadb + if (! (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter)) { + LOG.warn("only works on jdbc"); + return; + } + + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // setup durable subs + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + int numMessages = 4; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = null; + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // rollback so we get redelivery + connection.request(createRollbackTransaction(connectionInfo, txid)); + + LOG.info("new tx for redelivery"); + txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull("unexpected null on:" + i, message); + } + ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + // Commit + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); + } + private ActiveMQDestination[] destinationList(ActiveMQDestination dest) { return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest}; } @@ -564,6 +981,13 @@ public class XARecoveryBrokerTest extend assertNoMessagesLeft(connection); } + @Override + protected PolicyEntry getDefaultPolicy() { + PolicyEntry policyEntry = super.getDefaultPolicy(); + policyEntry.setPrioritizedMessages(prioritySupport); + return policyEntry; + } + public static Test suite() { return suite(XARecoveryBrokerTest.class); }
