Author: gtully
Date: Wed Feb 10 11:34:35 2010
New Revision: 908453
URL: http://svn.apache.org/viewvc?rev=908453&view=rev
Log:
svn merge -c 906450 https://svn.apache.org/repos/asf/activemq/trunk - further
evolution of resolution for https://issues.apache.org/activemq/browse/AMQ-2590
- indoubt transactions are now rolledback, pending transactions can wait for
jms.consumerFailoverRedeliveryWaitPeriod for redeliveries before rolling back.
If previously delivered messages are not replayed the transaction is rolledback
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Wed Feb 10 11:34:35 2010
@@ -188,6 +188,7 @@
private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner;
protected CountDownLatch transportInterruptionProcessingComplete;
+ private long consumerFailoverRedeliveryWaitPeriod;
/**
* Construct an <code>ActiveMQConnection</code>
@@ -2244,7 +2245,7 @@
protected void waitForTransportInterruptionProcessing() throws
InterruptedException {
if (transportInterruptionProcessingComplete != null) {
- while (!closed.get() && !transportFailed.get() &&
!transportInterruptionProcessingComplete.await(15, TimeUnit.SECONDS)) {
+ while (!closed.get() && !transportFailed.get() &&
!transportInterruptionProcessingComplete.await(10, TimeUnit.SECONDS)) {
LOG.warn("dispatch paused, waiting for outstanding dispatch
interruption processing (" + transportInterruptionProcessingComplete.getCount()
+ ") to complete..");
}
synchronized (this) {
@@ -2258,4 +2259,35 @@
transportInterruptionProcessingComplete.countDown();
}
}
+
+ private void signalInterruptionProcessingComplete() throws
InterruptedException {
+ if (transportInterruptionProcessingComplete.await(0,
TimeUnit.SECONDS)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("transportInterruptionProcessingComplete for: " +
this.getConnectionInfo().getConnectionId());
+ }
+ synchronized (this) {
+ transportInterruptionProcessingComplete = null;
+ FailoverTransport failoverTransport =
transport.narrow(FailoverTransport.class);
+ if (failoverTransport != null) {
+
failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("notified failover transport (" +
failoverTransport +") of interruption completion for: " +
this.getConnectionInfo().getConnectionId());
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * specify the amount of time in milliseconds that a consumer with a
transaction pending recovery
+ * will wait to receive re dispatched messages.
+ * default value is 0 so there is no wait by default.
+ */
+ public void setConsumerFailoverRedeliveryWaitPeriod(long
consumerFailoverRedeliveryWaitPeriod) {
+ this.consumerFailoverRedeliveryWaitPeriod =
consumerFailoverRedeliveryWaitPeriod;
+ }
+
+ public long getConsumerFailoverRedeliveryWaitPeriod() {
+ return consumerFailoverRedeliveryWaitPeriod;
+ }
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Wed Feb 10 11:34:35 2010
@@ -114,6 +114,7 @@
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber =
ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
private boolean useDedicatedTaskRunner;
+ private long consumerFailoverRedeliveryWaitPeriod = 0;
// /////////////////////////////////////////////
//
@@ -315,6 +316,7 @@
connection.setAuditDepth(getAuditDepth());
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
+
connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@@ -913,4 +915,12 @@
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
+
+ public void setConsumerFailoverRedeliveryWaitPeriod(long
consumerFailoverRedeliveryWaitPeriod) {
+ this.consumerFailoverRedeliveryWaitPeriod =
consumerFailoverRedeliveryWaitPeriod;
+ }
+
+ public long getConsumerFailoverRedeliveryWaitPeriod() {
+ return consumerFailoverRedeliveryWaitPeriod;
+ }
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Wed Feb 10 11:34:35 2010
@@ -112,6 +112,7 @@
// not been acknowledged. It's kept in reverse order since we
// Always walk list in reverse order.
private final LinkedList<MessageDispatch> deliveredMessages = new
LinkedList<MessageDispatch>();
+ // track duplicate deliveries in a transaction such that the tx integrity
can be validated
private HashMap<MessageId, Boolean> previouslyDeliveredMessages;
private int deliveredCounter;
private int additionalWindowSize;
@@ -141,6 +142,8 @@
private long optimizeAckTimestamp = System.currentTimeMillis();
private long optimizeAckTimeout = 300;
+ private long failoverRedeliveryWaitPeriod = 0;
+ private boolean rollbackInitiated;
/**
* Create a MessageConsumer
@@ -228,7 +231,7 @@
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge()
&& session.isAutoAcknowledge()
&& !info.isBrowser();
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
-
+ this.failoverRedeliveryWaitPeriod =
session.connection.getConsumerFailoverRedeliveryWaitPeriod();
if (messageListener != null) {
setMessageListener(messageListener);
}
@@ -948,6 +951,7 @@
*/
public void acknowledge() throws JMSException {
clearDispatchList();
+ waitForRedeliveries();
synchronized(deliveredMessages) {
// Acknowledge all messages so far.
MessageAck ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -972,51 +976,67 @@
}
}
+ private void waitForRedeliveries() {
+ if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages !=
null) {
+ long expiry = System.currentTimeMillis() +
failoverRedeliveryWaitPeriod;
+ int numberNotReplayed;
+ do {
+ numberNotReplayed = 0;
+ synchronized(deliveredMessages) {
+ if (previouslyDeliveredMessages != null) {
+ for (Entry<MessageId, Boolean> entry:
previouslyDeliveredMessages.entrySet()) {
+ if (!entry.getValue()) {
+ numberNotReplayed++;
+ }
+ }
+ }
+ }
+ if (numberNotReplayed > 0) {
+ LOG.info("waiting for redelivery of " + numberNotReplayed
+ " to consumer :" + this.getConsumerId());
+ try {
+ Thread.sleep(Math.max(500,
failoverRedeliveryWaitPeriod/4));
+ } catch (InterruptedException outOfhere) {
+ break;
+ }
+ }
+ } while (numberNotReplayed > 0 && expiry <
System.currentTimeMillis());
+ }
+ }
+
/*
* called with deliveredMessages locked
*/
private void rollbackOnFailedRecoveryRedelivery() throws JMSException {
if (previouslyDeliveredMessages != null) {
+ if (rollbackInitiated) {
+ // second call from rollback, nothing more to do
+ // REVISIT - should beforeEnd be called again by transaction
context?
+ rollbackInitiated = false;
+ return;
+ }
// if any previously delivered messages was not re-delivered,
transaction is invalid and must rollback
// as messages have been dispatched else where.
int numberNotReplayed = 0;
for (Entry<MessageId, Boolean> entry:
previouslyDeliveredMessages.entrySet()) {
if (!entry.getValue()) {
numberNotReplayed++;
- // allow outstanding messages to get delivered again
- removeFromDeliveredMessages(entry.getKey());
if (LOG.isDebugEnabled()) {
LOG.debug("previously delivered message has not been
replayed in transaction, id: " + entry.getKey());
}
}
}
- clearPreviouslyDelivered();
if (numberNotReplayed > 0) {
String message = "rolling back transaction post failover
recovery. " + numberNotReplayed
+ " previously delivered message(s) not replayed to
consumer: " + this.getConsumerId();
LOG.warn(message);
+ rollbackInitiated = true;
throw new TransactionRolledBackException(message);
}
}
}
- /*
- * called with deliveredMessages locked
- */
- private void removeFromDeliveredMessages(MessageId key) {
- ListIterator<MessageDispatch> iterator =
deliveredMessages.listIterator(deliveredMessages.size());
- while (iterator.hasPrevious()) {
- MessageDispatch candidate = iterator.previous();
- if (key.equals(candidate.getMessage().getMessageId())) {
- session.connection.rollbackDuplicate(this,
candidate.getMessage());
- iterator.remove();
- break;
- }
- }
- }
-
void acknowledge(MessageDispatch md) throws JMSException {
MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
session.sendAck(ack);
@@ -1049,7 +1069,7 @@
}
}
synchronized(deliveredMessages) {
- clearPreviouslyDelivered();
+ rollbackPreviouslyDeliveredAndNotRedelivered();
if (deliveredMessages.isEmpty()) {
return;
}
@@ -1126,6 +1146,37 @@
}
/*
+ * called with unconsumedMessages && deliveredMessages locked
+ * remove any message not re-delivered as they can't be replayed to this
+ * consumer on rollback
+ */
+ private void rollbackPreviouslyDeliveredAndNotRedelivered() {
+ if (previouslyDeliveredMessages != null) {
+ for (Entry<MessageId, Boolean> entry:
previouslyDeliveredMessages.entrySet()) {
+ if (!entry.getValue()) {
+ removeFromDeliveredMessages(entry.getKey());
+ }
+ }
+ rollbackInitiated = false;
+ clearPreviouslyDelivered();
+ }
+ }
+
+ /*
+ * called with deliveredMessages locked
+ */
+ private void removeFromDeliveredMessages(MessageId key) {
+ ListIterator<MessageDispatch> iterator =
deliveredMessages.listIterator(deliveredMessages.size());
+ while (iterator.hasPrevious()) {
+ MessageDispatch candidate = iterator.previous();
+ if (key.equals(candidate.getMessage().getMessageId())) {
+ session.connection.rollbackDuplicate(this,
candidate.getMessage());
+ iterator.remove();
+ break;
+ }
+ }
+ }
+ /*
* called with deliveredMessages locked
*/
private void clearPreviouslyDelivered() {
@@ -1170,7 +1221,7 @@
} else {
if (!session.isTransacted()) {
if (LOG.isDebugEnabled()) {
- LOG.debug(getConsumerId() + " ignoring(auto
acking) duplicate: " + md.getMessage());
+ LOG.debug(getConsumerId() + " ignoring (auto
acking) duplicate: " + md.getMessage());
}
MessageAck ack = new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, 1);
session.sendAck(ack);
@@ -1178,12 +1229,24 @@
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking
transacted redlivery of duplicate: " + md.getMessage());
}
+ boolean needsPoisonAck = false;
synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) {
previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
+ } else {
+ // existing transaction gone but still a
duplicate!, lets mark as poison ftm,
+ // possibly could allow redelivery..
+ needsPoisonAck = true;
}
}
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ if (needsPoisonAck) {
+ LOG.warn("acking as poison, duplicate
transacted delivery but no recovering transaction for: " + md);
+ MessageAck poisonAck = new MessageAck(md,
MessageAck.POSION_ACK_TYPE, 1);
+
poisonAck.setFirstMessageId(md.getMessage().getMessageId());
+ session.sendAck(poisonAck);
+ } else {
+ ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+ }
}
}
}
@@ -1197,7 +1260,7 @@
}
}
- // async (on next call) clear delivered as they will be auto-acked as
duplicates if they arrive again
+ // async (on next call) clear or track delivered as they may be flagged as
duplicates if they arrive again
private void clearDispatchList() {
if (clearDispatchList) {
synchronized (deliveredMessages) {
@@ -1205,7 +1268,7 @@
if (!deliveredMessages.isEmpty()) {
if (session.isTransacted()) {
if (LOG.isDebugEnabled()) {
- LOG.debug(getConsumerId() + " tracking
delivered list (" + deliveredMessages.size() + ") on transport interrupt");
+ LOG.debug(getConsumerId() + " tracking
existing transacted delivered list (" + deliveredMessages.size() + ") on
transport interrupt");
}
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new
HashMap<MessageId, Boolean>();
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Wed Feb 10 11:34:35 2010
@@ -293,11 +293,21 @@
TransactionInfo info = new TransactionInfo(getConnectionId(),
transactionId, TransactionInfo.COMMIT_ONE_PHASE);
this.transactionId = null;
// Notify the listener that the tx was committed back
- syncSendPacketWithInterruptionHandling(info);
- if (localTransactionEventListener != null) {
- localTransactionEventListener.commitEvent();
+ try {
+ syncSendPacketWithInterruptionHandling(info);
+ if (localTransactionEventListener != null) {
+ localTransactionEventListener.commitEvent();
+ }
+ afterCommit();
+ } catch (JMSException cause) {
+ LOG.info("commit failed for transaction " +
info.getTransactionId(), cause);
+ if (localTransactionEventListener != null) {
+ localTransactionEventListener.rollbackEvent();
+ }
+ afterRollback();
+ throw cause;
}
- afterCommit();
+
}
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Wed Feb 10 11:34:35 2010
@@ -23,12 +23,15 @@
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.TransactionRolledBackException;
+
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
@@ -140,22 +143,24 @@
}
private void restoreTransactions(Transport transport, ConnectionState
connectionState) throws IOException {
- Vector<Command> toIgnore = new Vector<Command>();
+ Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
for (TransactionState transactionState :
connectionState.getTransactionStates()) {
if (LOG.isDebugEnabled()) {
LOG.debug("tx: " + transactionState.getId());
}
- // ignore any empty (ack) transaction
- if (transactionState.getCommands().size() == 2) {
- Command lastCommand = transactionState.getCommands().get(1);
+ // rollback any completed transactions - no way to know if commit
got there
+ // or if reply went missing
+ //
+ if (!transactionState.getCommands().isEmpty()) {
+ Command lastCommand =
transactionState.getCommands().get(transactionState.getCommands().size() - 1);
if (lastCommand instanceof TransactionInfo) {
TransactionInfo transactionInfo = (TransactionInfo)
lastCommand;
if (transactionInfo.getType() ==
TransactionInfo.COMMIT_ONE_PHASE) {
if (LOG.isDebugEnabled()) {
- LOG.debug("not replaying empty (ack) tx: " +
transactionState.getId());
+ LOG.debug("rolling back potentially completed tx:
" + transactionState.getId());
}
- toIgnore.add(lastCommand);
+ toRollback.add(transactionInfo);
continue;
}
}
@@ -184,9 +189,10 @@
}
}
- for (Command command: toIgnore) {
+ for (TransactionInfo command: toRollback) {
// respond to the outstanding commit
- Response response = new Response();
+ ExceptionResponse response = new ExceptionResponse();
+ response.setException(new
TransactionRolledBackException("Transaction completion in doubt due to
failover. Forcing rollback of " + command.getTransactionId()));
response.setCorrelationId(command.getCommandId());
transport.getTransportListener().onCommand(response);
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Wed Feb 10 11:34:35 2010
@@ -212,7 +212,9 @@
failedConnectTransportURI=connectedTransportURI;
connectedTransportURI = null;
connected=false;
-
+
+ stateTracker.transportInterrupted();
+
// notify before any reconnect attempt so ack state can be
whacked
if (transportListener != null) {
transportListener.transportInterupted();
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
Wed Feb 10 11:34:35 2010
@@ -16,13 +16,16 @@
*/
package org.apache.activemq.transport.failover;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -58,6 +61,7 @@
private static final Log LOG =
LogFactory.getLog(FailoverConsumerOutstandingCommitTest.class);
private static final String QUEUE_NAME =
"FailoverWithOutstandingCommit";
+ private static final String MESSAGE_TEXT = "Test message ";
private String url = "tcp://localhost:61616";
final int prefetch = 10;
BrokerService broker;
@@ -133,7 +137,7 @@
connection.start();
final Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- final Queue destination = producerSession.createQueue(QUEUE_NAME +
"?jms.consumer.prefetch=" + prefetch);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME +
"?consumer.prefetchSize=" + prefetch);
final Session consumerSession = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
@@ -167,13 +171,105 @@
connection.close();
}
+
+ @Test
+ public void TestFailoverConsumerOutstandingSendTxIncomplete() throws
Exception {
+ doTestFailoverConsumerOutstandingSendTx(false);
+ }
+
+ @Test
+ public void TestFailoverConsumerOutstandingSendTxComplete() throws
Exception {
+ doTestFailoverConsumerOutstandingSendTx(true);
+ }
+
+ public void doTestFailoverConsumerOutstandingSendTx(final boolean
doActualBrokerCommit) throws Exception {
+ final boolean watchTopicAdvisories = true;
+ broker = createBroker(true);
+
+ broker.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() {
+ @Override
+ public void commitTransaction(ConnectionContext context,
+ TransactionId xid, boolean onePhase) throws Exception {
+ if (doActualBrokerCommit) {
+ LOG.info("doing actual broker commit...");
+ super.commitTransaction(context, xid, onePhase);
+ }
+ // so commit will hang as if reply is lost
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping broker before commit...");
+ try {
+ broker.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ } });
+ broker.start();
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ cf.setWatchTopicAdvisories(watchTopicAdvisories);
+ cf.setDispatchAsync(false);
+
+ final ActiveMQConnection connection = (ActiveMQConnection)
cf.createConnection();
+ connection.start();
+
+ final Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME
+ + "?consumer.prefetchSize=" + prefetch);
+
+ final Session consumerSession = connection.createSession(true,
Session.SESSION_TRANSACTED);
+
+ final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+ final CountDownLatch messagesReceived = new CountDownLatch(3);
+ final AtomicBoolean gotCommitException = new AtomicBoolean(false);
+ final ArrayList<TextMessage> receivedMessages = new
ArrayList<TextMessage>();
+ final MessageConsumer testConsumer =
consumerSession.createConsumer(destination);
+ testConsumer.setMessageListener(new MessageListener() {
+
+ public void onMessage(Message message) {
+ LOG.info("consume one and commit: " + message);
+ assertNotNull("got message", message);
+ receivedMessages.add((TextMessage) message);
+ try {
+ produceMessage(consumerSession, destination, 1);
+ consumerSession.commit();
+ } catch (JMSException e) {
+ LOG.info("commit exception", e);
+ gotCommitException.set(true);
+ }
+ commitDoneLatch.countDown();
+ messagesReceived.countDown();
+ LOG.info("done commit");
+ }
+ });
+
+ produceMessage(producerSession, destination, prefetch * 2);
+
+ // will be stopped by the plugin
+ broker.waitUntilStopped();
+ broker = createBroker(false);
+ broker.start();
+
+ assertTrue("commit done through failover", commitDoneLatch.await(20,
TimeUnit.SECONDS));
+ assertTrue("commit failed", gotCommitException.get());
+ assertTrue("another message was received after failover",
messagesReceived.await(20, TimeUnit.SECONDS));
+ assertEquals("get message 0 first", MESSAGE_TEXT + "0",
receivedMessages.get(0).getText());
+ // it was redelivered
+ assertEquals("get message 0 second", MESSAGE_TEXT + "0",
receivedMessages.get(1).getText());
+ assertTrue("another message was received", messagesReceived.await(20,
TimeUnit.SECONDS));
+ assertEquals("get message 1 eventually", MESSAGE_TEXT + "1",
receivedMessages.get(2).getText());
@Test
public void testRollbackFailoverConsumerTx() throws Exception {
broker = createBroker(true);
broker.start();
- ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ cf.setConsumerFailoverRedeliveryWaitPeriod(10000);
final ActiveMQConnection connection = (ActiveMQConnection)
cf.createConnection();
connection.start();
@@ -228,7 +324,7 @@
throws JMSException {
MessageProducer producer = producerSession.createProducer(destination);
for (int i=0; i<count; i++) {
- TextMessage message = producerSession.createTextMessage("Test
message " + i);
+ TextMessage message =
producerSession.createTextMessage(MESSAGE_TEXT + i);
producer.send(message);
}
producer.close();
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=908453&r1=908452&r2=908453&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Wed Feb 10 11:34:35 2010
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.transport.failover;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -40,6 +39,7 @@
import javax.jms.TransactionRolledBackException;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
@@ -55,6 +55,7 @@
import org.junit.Test;
// see https://issues.apache.org/activemq/browse/AMQ-2473
+// https://issues.apache.org/activemq/browse/AMQ-2590
public class FailoverTransactionTest {
private static final Log LOG =
LogFactory.getLog(FailoverTransactionTest.class);
@@ -167,11 +168,12 @@
LOG.info("doing async commit...");
try {
session.commit();
- commitDoneLatch.countDown();
- LOG.info("done async commit");
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (JMSException e) {
+ assertTrue(e instanceof TransactionRolledBackException);
+ LOG.info("got commit exception: ", e);
}
+ commitDoneLatch.countDown();
+ LOG.info("done async commit");
}
});
@@ -285,110 +287,8 @@
}
session.commit();
connection.close();
- }
-
- @Test
- public void testFailoverConsumerCommitLost() throws Exception {
- final int adapter = 0;
- broker = createBroker(true);
- setPersistenceAdapter(adapter);
-
- broker.setPlugins(new BrokerPlugin[] {
- new BrokerPluginSupport() {
-
- @Override
- public void commitTransaction(ConnectionContext context,
- TransactionId xid, boolean onePhase) throws
Exception {
- super.commitTransaction(context, xid, onePhase);
- // so commit will hang as if reply is lost
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new
Runnable() {
- public void run() {
- LOG.info("Stopping broker post commit...");
- try {
- broker.stop();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- });
- broker.start();
-
- ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
- Connection connection = cf.createConnection();
- connection.start();
- final Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- final Session consumerSession = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
- Queue destination = producerSession.createQueue(QUEUE_NAME);
-
- final MessageConsumer consumer =
consumerSession.createConsumer(destination);
-
- produceMessage(producerSession, destination);
-
- final Vector<Message> receivedMessages = new Vector<Message>();
- final CountDownLatch commitDoneLatch = new CountDownLatch(1);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- public void run() {
- LOG.info("doing async commit after consume...");
- try {
- Message msg = consumer.receive(20000);
- LOG.info("Got message: " + msg);
- receivedMessages.add(msg);
- consumerSession.commit();
- commitDoneLatch.countDown();
- LOG.info("done async commit");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
-
- // will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false);
- setPersistenceAdapter(adapter);
- broker.start();
-
- assertTrue("tx committed trough failover",
commitDoneLatch.await(30, TimeUnit.SECONDS));
-
- assertEquals("we got a message", 1, receivedMessages.size());
-
- // new transaction
- Message msg = consumer.receive(20000);
- LOG.info("Received: " + msg);
- assertNull("we did not get a duplicate message", msg);
- consumerSession.commit();
- consumer.close();
- connection.close();
-
- // ensure no dangling messages with fresh broker etc
- broker.stop();
- broker.waitUntilStopped();
-
- LOG.info("Checking for remaining/hung messages..");
- broker = createBroker(false);
- setPersistenceAdapter(adapter);
- broker.start();
-
- // after restart, ensure no dangling messages
- cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- connection = cf.createConnection();
- connection.start();
- Session session2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer2 = session2.createConsumer(destination);
- msg = consumer2.receive(1000);
- if (msg == null) {
- msg = consumer2.receive(5000);
- }
- LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
- connection.close();
}
-
+
@Test
public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order, do a few times
@@ -563,6 +463,93 @@
connection.close();
}
+ @Test
+ public void testAutoRollbackWithMissingRedeliveries() throws Exception {
+ broker = createBroker(true);
+ broker.start();
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url + ")");
+ Connection connection = cf.createConnection();
+ connection.start();
+ final Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME +
"?consumer.prefetchSize=1");
+ final Session consumerSession = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+ produceMessage(producerSession, destination);
+
+ Message msg = consumer.receive(20000);
+ assertNotNull(msg);
+
+ broker.stop();
+ broker = createBroker(false);
+ // use empty jdbc store so that default wait for redeliveries will
timeout after failover
+ setPersistenceAdapter(1);
+ broker.start();
+
+ try {
+ consumerSession.commit();
+ } catch (JMSException expectedRolledback) {
+ assertTrue(expectedRolledback instanceof
TransactionRolledBackException);
+ }
+
+ broker.stop();
+ broker = createBroker(false);
+ broker.start();
+
+ assertNotNull("should get rolledback message from original restarted
broker", consumer.receive(20000));
+ connection.close();
+ }
+
+
+ @Test
+ public void testWaitForMissingRedeliveries() throws Exception {
+ LOG.info("testWaitForMissingRedeliveries()");
+ broker = createBroker(true);
+ broker.start();
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("failover:(" + url +
")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
+ Connection connection = cf.createConnection();
+ connection.start();
+ final Session producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME);
+ final Session consumerSession = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+ produceMessage(producerSession, destination);
+ Message msg = consumer.receive(20000);
+ if (msg == null) {
+ AutoFailTestSupport.dumpAllThreads("missing-");
+ }
+ assertNotNull("got message just produced", msg);
+
+ broker.stop();
+ broker = createBroker(false);
+ // use empty jdbc store so that wait for re-deliveries occur when
failover resumes
+ setPersistenceAdapter(1);
+ broker.start();
+
+ final CountDownLatch commitDone = new CountDownLatch(1);
+ // will block pending re-deliveries
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("doing async commit...");
+ try {
+ consumerSession.commit();
+ commitDone.countDown();
+ } catch (JMSException ignored) {
+ }
+ }
+ });
+
+ broker.stop();
+ broker = createBroker(false);
+ broker.start();
+
+ assertTrue("commit was successfull", commitDone.await(30,
TimeUnit.SECONDS));
+
+ assertNull("should not get committed message", consumer.receive(5000));
+ connection.close();
+ }
+
private void produceMessage(final Session producerSession, Queue
destination)
throws JMSException {
MessageProducer producer =
producerSession.createProducer(destination);