Author: gtully
Date: Fri Sep 30 11:59:44 2011
New Revision: 1177619
URL: http://svn.apache.org/viewvc?rev=1177619&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3519 - Allow JMSRedelivered flag to
survive a restart. Add transactedIndividualAck flag to connection factory and
rewriteOnRedelivery to KahaDBPersistenceAdapter. These combine to persist the
redelivery status on a rollback
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Sep 30 11:59:44 2011
@@ -193,6 +193,7 @@ public class ActiveMQConnection implemen
private long consumerFailoverRedeliveryWaitPeriod;
private final Scheduler scheduler;
private boolean messagePrioritySupported=true;
+ private boolean transactedIndividualAck = false;
/**
* Construct an <code>ActiveMQConnection</code>
@@ -2399,6 +2400,15 @@ public class ActiveMQConnection implemen
this.checkForDuplicates = checkForDuplicates;
}
+
+ public boolean isTransactedIndividualAck() {
+ return transactedIndividualAck;
+ }
+
+ public void setTransactedIndividualAck(boolean transactedIndividualAck) {
+ this.transactedIndividualAck = transactedIndividualAck;
+ }
+
/**
* Removes any TempDestinations that this connection has cached, ignoring
* any exceptions generated because the destination is in use as they
should
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Fri Sep 30 11:59:44 2011
@@ -119,6 +119,7 @@ public class ActiveMQConnectionFactory e
private boolean checkForDuplicates = true;
private ClientInternalExceptionListener clientInternalExceptionListener;
private boolean messagePrioritySupported = true;
+ private boolean transactedIndividualAck = false;
// /////////////////////////////////////////////
//
@@ -325,6 +326,7 @@ public class ActiveMQConnectionFactory e
connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
connection.setCheckForDuplicates(isCheckForDuplicates());
connection.setMessagePrioritySupported(isMessagePrioritySupported());
+ connection.setTransactedIndividualAck(isTransactedIndividualAck());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@@ -707,6 +709,8 @@ public class ActiveMQConnectionFactory e
props.setProperty("auditMaximumProducerNumber",
Integer.toString(getAuditMaximumProducerNumber()));
props.setProperty("checkForDuplicates",
Boolean.toString(isCheckForDuplicates()));
props.setProperty("messagePrioritySupported",
Boolean.toString(isMessagePrioritySupported()));
+ props.setProperty("transactedIndividualAck",
Boolean.toString(isTransactedIndividualAck()));
+
}
public boolean isUseCompression() {
@@ -1019,4 +1023,18 @@ public class ActiveMQConnectionFactory e
public void setCheckForDuplicates(boolean checkForDuplicates) {
this.checkForDuplicates = checkForDuplicates;
}
+
+ public boolean isTransactedIndividualAck() {
+ return transactedIndividualAck;
+ }
+
+ /**
+ * when true, submit individual transacted acks immediately rather than
with transaction completion.
+ * This allows the acks to represent delivery status which can be
persisted on rollback
+ * Used in conjunction with
org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)
true
+ */
+ public void setTransactedIndividualAck(boolean transactedIndividualAck) {
+ this.transactedIndividualAck = transactedIndividualAck;
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri Sep 30 11:59:44 2011
@@ -152,6 +152,7 @@ public class ActiveMQMessageConsumer imp
private long optimizeAckTimestamp = System.currentTimeMillis();
private long optimizeAcknowledgeTimeOut = 0;
private long failoverRedeliveryWaitPeriod = 0;
+ private boolean transactedIndividualAck = false;
/**
* Create a MessageConsumer
@@ -249,6 +250,7 @@ public class ActiveMQMessageConsumer imp
}
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
this.failoverRedeliveryWaitPeriod =
session.connection.getConsumerFailoverRedeliveryWaitPeriod();
+ this.transactedIndividualAck =
session.connection.isTransactedIndividualAck();
if (messageListener != null) {
setMessageListener(messageListener);
}
@@ -678,7 +680,7 @@ public class ActiveMQMessageConsumer imp
synchronized (unconsumedMessages.getMutex()) {
if (inProgressClearRequiredFlag) {
if (LOG.isDebugEnabled()) {
- LOG.debug(getConsumerId() + " clearing dispatched list
(" + unconsumedMessages.size() + ") on transport interrupt");
+ LOG.debug(getConsumerId() + " clearing unconsumed list
(" + unconsumedMessages.size() + ") on transport interrupt");
}
// ensure unconsumed are rolledback up front as they may
get redelivered to another consumer
List<MessageDispatch> list =
unconsumedMessages.removeAll();
@@ -833,11 +835,24 @@ public class ActiveMQMessageConsumer imp
deliveredMessages.addFirst(md);
}
if (session.getTransacted()) {
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ if (transactedIndividualAck) {
+ immediateIndividualTransactedAck(md);
+ } else {
+ ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ }
}
}
}
-
+
+ private void immediateIndividualTransactedAck(MessageDispatch md) throws
JMSException {
+ // acks accumulate on the broker pending transaction completion to
indicate
+ // delivery status
+ registerSync();
+ MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
+
ack.setTransactionId(session.getTransactionContext().getTransactionId());
+ session.sendAck(ack);
+ }
+
private void afterMessageIsConsumed(MessageDispatch md, boolean
messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
@@ -919,29 +934,7 @@ public class ActiveMQMessageConsumer imp
// Don't acknowledge now, but we may need to let the broker know the
// consumer got the message to expand the pre-fetch window
if (session.getTransacted()) {
- session.doStartTransaction();
- if (!synchronizationRegistered) {
- synchronizationRegistered = true;
- session.getTransactionContext().addSynchronization(new
Synchronization() {
- @Override
- public void beforeEnd() throws Exception {
- acknowledge();
- synchronizationRegistered = false;
- }
-
- @Override
- public void afterCommit() throws Exception {
- commit();
- synchronizationRegistered = false;
- }
-
- @Override
- public void afterRollback() throws Exception {
- rollback();
- synchronizationRegistered = false;
- }
- });
- }
+ registerSync();
}
deliveredCounter++;
@@ -976,6 +969,40 @@ public class ActiveMQMessageConsumer imp
}
}
+ private void registerSync() throws JMSException {
+ session.doStartTransaction();
+ if (!synchronizationRegistered) {
+ synchronizationRegistered = true;
+ session.getTransactionContext().addSynchronization(new
Synchronization() {
+ @Override
+ public void beforeEnd() throws Exception {
+ if (transactedIndividualAck) {
+ clearDispatchList();
+ waitForRedeliveries();
+ synchronized(deliveredMessages) {
+ rollbackOnFailedRecoveryRedelivery();
+ }
+ } else {
+ acknowledge();
+ }
+ synchronizationRegistered = false;
+ }
+
+ @Override
+ public void afterCommit() throws Exception {
+ commit();
+ synchronizationRegistered = false;
+ }
+
+ @Override
+ public void afterRollback() throws Exception {
+ rollback();
+ synchronizationRegistered = false;
+ }
+ });
+ }
+ }
+
/**
* Acknowledge all the messages that have been delivered to the client up
to
* this point.
@@ -1284,7 +1311,11 @@ public class ActiveMQMessageConsumer imp
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
session.sendAck(poisonAck);
} else {
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ if (transactedIndividualAck) {
+ immediateIndividualTransactedAck(md);
+ } else {
+ ackLater(md,
MessageAck.DELIVERED_ACK_TYPE);
+ }
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Sep 30 11:59:44 2011
@@ -593,17 +593,20 @@ public class BrokerService implements Se
tempDataStore.stop();
tempDataStore = null;
}
- stopper.stop(persistenceAdapter);
- persistenceAdapter = null;
- slave = true;
- if (isUseJmx()) {
- stopper.stop(getManagementContext());
- managementContext = null;
+ try {
+ stopper.stop(persistenceAdapter);
+ persistenceAdapter = null;
+ slave = true;
+ if (isUseJmx()) {
+ stopper.stop(getManagementContext());
+ managementContext = null;
+ }
+ // Clear SelectorParser cache to free memory
+ SelectorParser.clearCache();
+ } finally {
+ stopped.set(true);
+ stoppedLatch.countDown();
}
- // Clear SelectorParser cache to free memory
- SelectorParser.clearCache();
- stopped.set(true);
- stoppedLatch.countDown();
if (masterConnectorURI == null) {
// master start has not finished yet
if (slaveStartSignal.getCount() == 1) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Sep 30 11:59:44 2011
@@ -223,32 +223,7 @@ public abstract class PrefetchSubscripti
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
removeList.add(node);
} else {
- // setup a Synchronization to remove nodes from the
- // dispatched list.
- context.getTransaction().addSynchronization(
- new Synchronization() {
-
- @Override
- public void afterCommit()
- throws Exception {
- synchronized(dispatchLock) {
- dequeueCounter++;
- dispatched.remove(node);
-
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
- }
- }
-
- @Override
- public void afterRollback() throws
Exception {
- synchronized(dispatchLock) {
- if (isSlave()) {
-
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
- } else {
- // poisionAck will
decrement - otherwise still inflight on client
- }
- }
- }
- });
+ registerRemoveSync(context, node);
}
index++;
acknowledge(context, ack, node);
@@ -281,13 +256,17 @@ public abstract class PrefetchSubscripti
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getLastMessageId().equals(messageId)) {
- // this should never be within a transaction
- dequeueCounter++;
-
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
- destination = node.getRegionDestination();
- acknowledge(context, ack, node);
- dispatched.remove(node);
+ // Don't remove the nodes until we are committed -
immediateAck option
+ if (!context.isInTransaction()) {
+ dequeueCounter++;
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ dispatched.remove(node);
+ } else {
+ registerRemoveSync(context, node);
+ }
prefetchExtension = Math.max(0, prefetchExtension - 1);
+ acknowledge(context, ack, node);
+ destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@@ -406,6 +385,35 @@ public abstract class PrefetchSubscripti
}
}
+ private void registerRemoveSync(ConnectionContext context, final
MessageReference node) {
+ // setup a Synchronization to remove nodes from the
+ // dispatched list.
+ context.getTransaction().addSynchronization(
+ new Synchronization() {
+
+ @Override
+ public void afterCommit()
+ throws Exception {
+ synchronized(dispatchLock) {
+ dequeueCounter++;
+ dispatched.remove(node);
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ }
+ }
+
+ @Override
+ public void afterRollback() throws Exception {
+ synchronized(dispatchLock) {
+ if (isSlave()) {
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ } else {
+ // poisionAck will decrement - otherwise still
inflight on client
+ }
+ }
+ }
+ });
+ }
+
/**
* Checks an ack versus the contents of the dispatched list.
*
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Fri Sep 30 11:59:44 2011
@@ -500,6 +500,18 @@ public class KahaDBPersistenceAdapter im
letter.setForceRecoverIndex(forceRecoverIndex);
}
+ /**
+ * When true, persist the redelivery status such that the message
redelivery flag can survive a broker failure
+ * used with
org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)
true
+ */
+ public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
+ letter.setRewriteOnRedelivery(rewriteOnRedelivery);
+ }
+
+ public boolean isRewriteOnRedelivery() {
+ return letter.isRewriteOnRedelivery();
+ }
+
public KahaDBStore getStore() {
return letter;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Sep 30 11:59:44 2011
@@ -63,6 +63,7 @@ import org.apache.activemq.usage.MemoryU
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.util.ByteSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kahadb.journal.Location;
@@ -244,6 +245,57 @@ public class KahaDBStore extends Message
super.doStop(stopper);
}
+ void incrementRedeliveryAndReWrite(final String key, final KahaDestination
destination) throws IOException {
+ Location location;
+ this.indexLock.writeLock().lock();
+ try {
+ location = findMessageLocation(key, destination);
+ } finally {
+ this.indexLock.writeLock().unlock();
+ }
+
+ if (location != null) {
+ KahaAddMessageCommand addMessage = (KahaAddMessageCommand)
load(location);
+ Message message = (Message) wireFormat.unmarshal(new
DataInputStream(addMessage.getMessage().newInput()));
+
+ message.incrementRedeliveryCounter();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("rewriting: " + key + " with deliveryCount: " +
message.getRedeliveryCounter());
+ }
+ org.apache.activemq.util.ByteSequence packet =
wireFormat.marshal(message);
+ addMessage.setMessage(new Buffer(packet.getData(),
packet.getOffset(), packet.getLength()));
+
+ final Location rewriteLocation =
journal.write(toByteSequence(addMessage), true);
+
+ this.indexLock.writeLock().lock();
+ try {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ StoredDestination sd =
getStoredDestination(destination, tx);
+ Long sequence = sd.messageIdIndex.get(tx, key);
+ MessageKeys keys = sd.orderIndex.get(tx, sequence);
+ sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(),
sequence, new MessageKeys(keys.messageId, rewriteLocation));
+ }
+ });
+ } finally {
+ this.indexLock.writeLock().unlock();
+ }
+ }
+ }
+
+ private Location findMessageLocation(final String key, final
KahaDestination destination) throws IOException {
+ return pageFile.tx().execute(new Transaction.CallableClosure<Location,
IOException>() {
+ public Location execute(Transaction tx) throws IOException {
+ StoredDestination sd = getStoredDestination(destination, tx);
+ Long sequence = sd.messageIdIndex.get(tx, key);
+ if (sequence == null) {
+ return null;
+ }
+ return sd.orderIndex.get(tx, sequence).location;
+ }
+ });
+ }
+
protected StoreQueueTask removeQueueTask(KahaDBMessageStore store,
MessageId id) {
StoreQueueTask task = null;
synchronized (store.asyncTaskMap) {
@@ -390,16 +442,7 @@ public class KahaDBStore extends Message
Location location;
indexLock.readLock().lock();
try {
- location = pageFile.tx().execute(new
Transaction.CallableClosure<Location, IOException>() {
- public Location execute(Transaction tx) throws IOException
{
- StoredDestination sd = getStoredDestination(dest, tx);
- Long sequence = sd.messageIdIndex.get(tx, key);
- if (sequence == null) {
- return null;
- }
- return sd.orderIndex.get(tx, sequence).location;
- }
- });
+ location = findMessageLocation(key, dest);
}finally {
indexLock.readLock().unlock();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Fri Sep 30 11:59:44 2011
@@ -407,7 +407,7 @@ public class KahaDBTransactionStore impl
return message;
}
@Override
- public Future run(ConnectionContext ctx) throws
IOException {
+ public Future<Object> run(ConnectionContext ctx) throws
IOException {
return destination.asyncAddTopicMessage(ctx, message);
}
@@ -454,7 +454,7 @@ public class KahaDBTransactionStore impl
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() ||
theStore.isConcurrentStoreAndDispatchTransactions()==false) {
- destination.removeAsyncMessage(context, ack);
+ destination.removeMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Sep 30 11:59:44 2011
@@ -94,7 +94,7 @@ import org.apache.kahadb.util.VariableMa
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MessageDatabase extends ServiceSupport implements
BrokerServiceAware {
+public abstract class MessageDatabase extends ServiceSupport implements
BrokerServiceAware {
protected BrokerService brokerService;
@@ -224,6 +224,7 @@ public class MessageDatabase extends Ser
private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
protected boolean forceRecoverIndex = false;
private final Object checkpointThreadLock = new Object();
+ private boolean rewriteOnRedelivery = false;
public MessageDatabase() {
}
@@ -400,24 +401,27 @@ public class MessageDatabase extends Ser
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
- this.indexLock.writeLock().lock();
try {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, true);
- }
- });
- pageFile.unload();
- metadata = new Metadata();
+ this.indexLock.writeLock().lock();
+ try {
+ pageFile.tx().execute(new
Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException
{
+ checkpointUpdate(tx, true);
+ }
+ });
+ pageFile.unload();
+ metadata = new Metadata();
+ } finally {
+ this.indexLock.writeLock().unlock();
+ }
+ journal.close();
+ synchronized (checkpointThreadLock) {
+ checkpointThread.join();
+ }
} finally {
- this.indexLock.writeLock().unlock();
+ lockFile.unlock();
+ lockFile=null;
}
- journal.close();
- synchronized (checkpointThreadLock) {
- checkpointThread.join();
- }
- lockFile.unlock();
- lockFile=null;
}
}
@@ -804,6 +808,14 @@ public class MessageDatabase extends Ser
return store(data, false, null,null);
}
+ public ByteSequence toByteSequence(JournalCommand<?> data) throws
IOException {
+ int size = data.serializedSizeFramed();
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+ os.writeByte(data.type().getNumber());
+ data.writeFramed(os);
+ return os.toByteSequence();
+ }
+
/**
* All updated are are funneled through this method. The updates are
converted
* to a JournalMessage which is logged to the journal and then the data
from
@@ -815,13 +827,9 @@ public class MessageDatabase extends Ser
before.run();
}
try {
- int size = data.serializedSizeFramed();
- DataByteArrayOutputStream os = new DataByteArrayOutputStream(size
+ 1);
- os.writeByte(data.type().getNumber());
- data.writeFramed(os);
-
+ ByteSequence sequence = toByteSequence(data);
long start = System.currentTimeMillis();
- Location location = journal.write(os.toByteSequence(), sync);
+ Location location = journal.write(sequence, sync);
long start2 = System.currentTimeMillis();
process(data, location, after);
long end = System.currentTimeMillis();
@@ -1079,16 +1087,35 @@ public class MessageDatabase extends Ser
}
}
- protected void process(KahaRollbackCommand command, Location location) {
+ protected void process(KahaRollbackCommand command, Location location)
throws IOException {
TransactionId key =
TransactionIdConversion.convert(command.getTransactionInfo());
+ List<Operation> updates = null;
synchronized (inflightTransactions) {
- List<Operation> tx = inflightTransactions.remove(key);
- if (tx == null) {
- preparedTransactions.remove(key);
+ updates = inflightTransactions.remove(key);
+ if (updates == null) {
+ updates = preparedTransactions.remove(key);
}
}
+ if (isRewriteOnRedelivery()) {
+ persistRedeliveryCount(updates);
+ }
}
+ private void persistRedeliveryCount(List<Operation> updates) throws
IOException {
+ if (updates != null) {
+ for (Operation operation : updates) {
+ operation.getCommand().visit(new Visitor() {
+ @Override
+ public void visit(KahaRemoveMessageCommand command) throws
IOException {
+ incrementRedeliveryAndReWrite(command.getMessageId(),
command.getDestination());
+ }
+ });
+ }
+ }
+ }
+
+ abstract void incrementRedeliveryAndReWrite(String key, KahaDestination
destination) throws IOException;
+
// /////////////////////////////////////////////////////////////////
// These methods do the actual index updates.
// /////////////////////////////////////////////////////////////////
@@ -1981,10 +2008,12 @@ public class MessageDatabase extends Ser
return TransactionIdConversion.convert(transactionInfo);
}
- abstract class Operation {
+ abstract class Operation <T extends JournalCommand<T>> {
+ final T command;
final Location location;
- public Operation(Location location) {
+ public Operation(T command, Location location) {
+ this.command = command;
this.location = location;
}
@@ -1992,15 +2021,17 @@ public class MessageDatabase extends Ser
return location;
}
+ public T getCommand() {
+ return command;
+ }
+
abstract public void execute(Transaction tx) throws IOException;
}
- class AddOpperation extends Operation {
- final KahaAddMessageCommand command;
+ class AddOpperation extends Operation<KahaAddMessageCommand> {
public AddOpperation(KahaAddMessageCommand command, Location location)
{
- super(location);
- this.command = command;
+ super(command, location);
}
@Override
@@ -2008,27 +2039,18 @@ public class MessageDatabase extends Ser
upadateIndex(tx, command, location);
}
- public KahaAddMessageCommand getCommand() {
- return command;
- }
}
- class RemoveOpperation extends Operation {
- final KahaRemoveMessageCommand command;
+ class RemoveOpperation extends Operation<KahaRemoveMessageCommand> {
public RemoveOpperation(KahaRemoveMessageCommand command, Location
location) {
- super(location);
- this.command = command;
+ super(command, location);
}
@Override
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
-
- public KahaRemoveMessageCommand getCommand() {
- return command;
- }
}
// /////////////////////////////////////////////////////////////////
@@ -2247,6 +2269,14 @@ public class MessageDatabase extends Ser
this.databaseLockedWaitDelay = databaseLockedWaitDelay;
}
+ public boolean isRewriteOnRedelivery() {
+ return rewriteOnRedelivery;
+ }
+
+ public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
+ this.rewriteOnRedelivery = rewriteOnRedelivery;
+ }
+
// /////////////////////////////////////////////////////////////////
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java?rev=1177619&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
Fri Sep 30 11:59:44 2011
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.transport.failover.FailoverTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RedeliveryRestartTest extends BrokerRestartTestSupport {
+ private static final transient Logger LOG =
LoggerFactory.getLogger(RedeliveryRestartTest.class);
+
+ @Override
+ protected void configureBroker(BrokerService broker) throws Exception {
+ super.configureBroker(broker);
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter =
(KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+ kahaDBPersistenceAdapter.setRewriteOnRedelivery(true);
+ broker.addConnector("tcp://0.0.0.0:0");
+ }
+
+ public void testValidateRedeliveryFlagAfterRestart() throws Exception {
+
+ ConnectionFactory connectionFactory =
+ new ActiveMQConnectionFactory("failover:(" +
broker.getTransportConnectors().get(0).getPublishableConnectString() +
")?jms.immediateAck=true");
+ ActiveMQConnection connection = (ActiveMQConnection)
connectionFactory.createConnection();
+ connection.start();
+
+ populateDestination(10, queueName, connection);
+
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(queueName);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ TextMessage msg = null;
+ for (int i=0; i<5;i++) {
+ msg = (TextMessage) consumer.receive(20000);
+ LOG.info("not redelivered? got: " + msg);
+ assertNotNull("got the message", msg);
+ assertEquals("first delivery", 1,
msg.getLongProperty("JMSXDeliveryCount"));
+ assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+ }
+ session.rollback();
+ consumer.close();
+
+ restartBroker();
+
+ // make failover aware of the restarted auto assigned port
+ ((FailoverTransport)
connection.getTransport().narrow(FailoverTransport.class)).add(true,
broker.getTransportConnectors().get(0).getPublishableConnectString());
+
+ consumer = session.createConsumer(destination);
+ for (int i=0; i<5;i++) {
+ msg = (TextMessage) consumer.receive(4000);
+ LOG.info("redelivered? got: " + msg);
+ assertNotNull("got the message again", msg);
+ assertEquals("redelivery count survives restart", 2,
msg.getLongProperty("JMSXDeliveryCount"));
+ assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+ }
+ session.commit();
+
+ // consume the rest that were not redeliveries
+ for (int i=0; i<5;i++) {
+ msg = (TextMessage) consumer.receive(20000);
+ LOG.info("not redelivered? got: " + msg);
+ assertNotNull("got the message", msg);
+ assertEquals("first delivery", 1,
msg.getLongProperty("JMSXDeliveryCount"));
+ assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+ }
+ session.commit();
+
+ connection.close();
+ }
+
+ public void testValidateRedeliveryFlagAfterRecovery() throws Exception {
+ ConnectionFactory connectionFactory =
+ new
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+ "?jms.immediateAck=true");
+ ActiveMQConnection connection = (ActiveMQConnection)
connectionFactory.createConnection();
+ connection.start();
+
+ populateDestination(1, queueName, connection);
+
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Destination destination = session.createQueue(queueName);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ TextMessage msg = (TextMessage) consumer.receive(20000);
+ LOG.info("got: " + msg);
+ assertNotNull("got the message", msg);
+ assertEquals("first delivery", 1,
msg.getLongProperty("JMSXDeliveryCount"));
+ assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter =
(KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+
+ // have the broker stop with an IOException on next checkpoint so it
has a pending local transaction to recover
+ kahaDBPersistenceAdapter.getStore().getJournal().close();
+ broker.waitUntilStopped();
+
+ broker = createRestartedBroker();
+ broker.start();
+
+ connectionFactory =
+ new
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+ "?jms.immediateAck=true");
+ connection = (ActiveMQConnection) connectionFactory.createConnection();
+ connection.start();
+
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ consumer = session.createConsumer(destination);
+ msg = (TextMessage) consumer.receive(10000);
+ assertNotNull("got the message again", msg);
+ assertEquals("redelivery count survives restart", 2,
msg.getLongProperty("JMSXDeliveryCount"));
+ assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+
+ session.commit();
+ connection.close();
+ }
+
+ private void populateDestination(final int nbMessages,
+ final String destinationName,
javax.jms.Connection connection)
+ throws JMSException {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(destinationName);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 1; i <= nbMessages; i++) {
+ producer.send(session.createTextMessage("<hello id='" + i +
"'/>"));
+ }
+ producer.close();
+ session.close();
+ }
+
+
+ public static Test suite() {
+ return suite(RedeliveryRestartTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
Fri Sep 30 11:59:44 2011
@@ -63,7 +63,6 @@ public class AMQ2736Test {
store.close();
} catch (Exception expectedLotsAsJournalBorked) {
}
- store.getLockFile().unlock();
broker.stop();
broker.waitUntilStopped();
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java?rev=1177619&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
Fri Sep 30 11:59:44 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest
{
+
+ public static Test suite() {
+ return suite(FailoverRedeliveryTransactionTest.class);
+ }
+
+ @Override
+ public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
+ super.configureConnectionFactory(factory);
+ factory.setImmediateAck(true);
+ }
+
+ @Override
+ public BrokerService createBroker(boolean deleteAllMessagesOnStartup,
String bindAddress) throws Exception {
+ BrokerService brokerService =
super.createBroker(deleteAllMessagesOnStartup, bindAddress);
+ configurePersistenceAdapter(brokerService);
+ return brokerService;
+ }
+
+ private void configurePersistenceAdapter(BrokerService brokerService)
throws IOException {
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter =
(KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter();
+ kahaDBPersistenceAdapter.setRewriteOnRedelivery(true);
+ }
+
+ @Override
+ public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService
broker) throws IOException {
+ PersistenceAdapter persistenceAdapter =
super.setDefaultPersistenceAdapter(broker);
+ configurePersistenceAdapter(broker);
+ return persistenceAdapter;
+ }
+
+ // no point rerunning these
+ @Override
+ public void testFailoverProducerCloseBeforeTransaction() throws Exception {
+ }
+
+ @Override
+ public void initCombosForTestFailoverCommitReplyLost() {
+ }
+
+ @Override
+ public void testFailoverCommitReplyLost() throws Exception {
+ }
+
+ @Override
+ public void initCombosForTestFailoverSendReplyLost() {
+ }
+
+ @Override
+ public void testFailoverSendReplyLost() throws Exception {
+ }
+
+ @Override
+ public void initCombosForTestFailoverConnectionSendReplyLost() {
+ }
+
+ @Override
+ public void testFailoverConnectionSendReplyLost() throws Exception {
+ }
+
+ @Override
+ public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled()
throws Exception {
+ }
+
+ @Override
+ public void testFailoverMultipleProducerCloseBeforeTransaction() throws
Exception {
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=1177619&r1=1177618&r2=1177619&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Fri Sep 30 11:59:44 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.failover;
import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.TestSupport;
@@ -114,9 +115,14 @@ public class FailoverTransactionTest ext
return broker;
}
+ public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
+ // nothing to do
+ }
+
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
@@ -170,6 +176,7 @@ public class FailoverTransactionTest ext
broker.start();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
@@ -222,6 +229,7 @@ public class FailoverTransactionTest ext
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -274,6 +282,7 @@ public class FailoverTransactionTest ext
broker.start();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url +
")?jms.watchTopicAdvisories=false");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -329,6 +338,7 @@ public class FailoverTransactionTest ext
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -400,6 +410,7 @@ public class FailoverTransactionTest ext
proxy.open();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() +
")?jms.watchTopicAdvisories=false");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -450,6 +461,7 @@ public class FailoverTransactionTest ext
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session session2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -466,6 +478,7 @@ public class FailoverTransactionTest ext
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled()
throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url +
")?trackTransactionProducers=false");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
@@ -489,6 +502,7 @@ public class FailoverTransactionTest ext
public void testFailoverMultipleProducerCloseBeforeTransaction() throws
Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
@@ -521,6 +535,7 @@ public class FailoverTransactionTest ext
public void testFailoverWithConnectionConsumer() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
@@ -573,6 +588,7 @@ public class FailoverTransactionTest ext
// as failure depends on hash order of state tracker recovery, do a
few times
for (int i = 0; i < 3; i++) {
try {
+ LOG.info("Iteration: " + i);
doTestFailoverConsumerAckLost(i);
} finally {
stopBroker();
@@ -612,6 +628,7 @@ public class FailoverTransactionTest ext
Vector<Connection> connections = new Vector<Connection>();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
connections.add(connection);
@@ -728,6 +745,7 @@ public class FailoverTransactionTest ext
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
connection = cf.createConnection();
connection.start();
Session sweeperSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -745,6 +763,7 @@ public class FailoverTransactionTest ext
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -782,6 +801,7 @@ public class FailoverTransactionTest ext
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url +
")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -830,6 +850,7 @@ public class FailoverTransactionTest ext
broker = createBroker(true);
broker.start();
ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url +
")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
+ configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);