Updated Branches: refs/heads/trunk 3af70ba28 -> 0f0c0d676
https://issues.apache.org/jira/browse/AMQ-4952 - deal with duplicates by redirecting to dlq when they are detected by the cursors such that they don't linger for redispatch after a restart. Networks are the main culprit for such duplicates b/c the producer audit traps regular failover resends Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f92d45be Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f92d45be Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f92d45be Branch: refs/heads/trunk Commit: f92d45bec1be76b1e38d3be2d9700bbdd9054d17 Parents: 3af70ba Author: gtully <[email protected]> Authored: Tue Jan 7 16:43:56 2014 +0000 Committer: gtully <[email protected]> Committed: Tue Jan 7 16:52:39 2014 +0000 ---------------------------------------------------------------------- .../activemq/broker/region/BaseDestination.java | 36 +- .../activemq/broker/region/Destination.java | 2 + .../broker/region/DestinationFilter.java | 5 + .../apache/activemq/broker/region/Queue.java | 2 + .../apache/activemq/broker/region/Topic.java | 11 + .../region/cursors/AbstractStoreCursor.java | 37 +- .../region/cursors/QueueStorePrefetch.java | 1 + .../region/cursors/TopicStorePrefetch.java | 7 +- .../activemq/store/kahadb/MessageDatabase.java | 2 +- .../kahadb/disk/journal/DataFileAccessor.java | 2 +- .../org/apache/activemq/bugs/AMQ4952Test.java | 475 +++++++++++++++++++ .../activemq/bugs/ConnectionPerMessageTest.java | 7 +- .../usecases/ThreeBrokerTopicNetworkTest.java | 15 +- 13 files changed, 578 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index f350098..7bbe360 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.Arrays; import java.util.List; import javax.jms.ResourceAllocationException; import org.apache.activemq.advisory.AdvisorySupport; @@ -32,6 +33,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; @@ -761,7 +763,7 @@ public abstract class BaseDestination implements Destination { return hasRegularConsumers; } - protected ConnectionContext createConnectionContext() { + public ConnectionContext createConnectionContext() { ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); answer.setBroker(this.broker); answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); @@ -790,4 +792,36 @@ public abstract class BaseDestination implements Destination { return getDeadLetterStrategy().isDLQ(this.getActiveMQDestination()); } + public void duplicateFromStore(Message message, Subscription durableSub) { + ConnectionContext connectionContext = createConnectionContext(); + + TransactionId transactionId = message.getTransactionId(); + if (transactionId != null && transactionId.isXATransaction()) { + try { + List<TransactionId> preparedTx = Arrays.asList(broker.getRoot().getPreparedTransactions(connectionContext)); + getLog().trace("processing duplicate in {}, prepared {} ", transactionId, preparedTx); + if (!preparedTx.contains(transactionId)) { + // duplicates from past transactions expected after org.apache.activemq.broker.region.Destination#clearPendingMessages + // till they are acked + getLog().debug("duplicate message from store {}, from historical transaction {}, ignoring", message.getMessageId(), transactionId); + return; + } + } catch (Exception ignored) { + getLog().debug("failed to determine state of transaction {} on duplicate message {}", transactionId, message.getMessageId(), ignored); + } + } + + getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId()); + Throwable cause = new Throwable("duplicate from store for " + destination); + message.setRegionDestination(this); + broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); + MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); + messageAck.setPoisonCause(cause); + try { + acknowledge(connectionContext, durableSub, messageAck, message); + } catch (IOException e) { + getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 93502e7..ddcba84 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -239,4 +239,6 @@ public interface Destination extends Service, Task, Message.MessageDestination { public void clearPendingMessages(); public boolean isDLQ(); + + void duplicateFromStore(Message message, Subscription subscription); } http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index ecf6cf7..7e21bcc 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -388,6 +388,11 @@ public class DestinationFilter implements Destination { return next.isDLQ(); } + @Override + public void duplicateFromStore(Message message, Subscription subscription) { + next.duplicateFromStore(message, subscription); + } + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { if (next instanceof DestinationFilter) { DestinationFilter filter = (DestinationFilter) next; http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 52d7282..18a4a69 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1747,6 +1747,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, MessageAck ack) throws IOException { + LOG.trace("ack of {} with {}", reference.getMessageId(), ack); reference.setAcked(true); // This sends the ack the the journal.. if (!ack.isInTransaction()) { @@ -2049,6 +2050,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) { // Dispatch it. s.add(node); + LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId()); iterator.remove(); target = s; break; http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index ab0f8ce..f85ed66 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -805,6 +805,17 @@ public class Topic extends BaseDestination implements Task { } } + private void rollback(MessageId poisoned) { + dispatchLock.readLock().lock(); + try { + for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values()) { + durableTopicSubscription.getPending().rollback(poisoned); + } + } finally { + dispatchLock.readLock().unlock(); + } + } + public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() { return durableSubscribers; } http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 495d365..bb77afb 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -17,8 +17,10 @@ package org.apache.activemq.broker.region.cursors; import java.util.Iterator; +import java.util.LinkedList; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.TransactionId; @@ -93,17 +95,25 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i recovered = true; storeHasMessages = true; } else { - /* - * we should expect to get these - as the message is recorded as it before it goes into - * the cache. If subsequently, we pull out that message from the store (before its deleted) - * it will be a duplicate - but should be ignored - */ - LOG.trace("{} - cursor got duplicate: {}, {}", new Object[]{ this, message.getMessageId(), message.getPriority() }); + LOG.warn("{} - cursor got duplicate: {}, {}", new Object[]{ this, message.getMessageId(), message.getPriority() }); + duplicate(message); } return recovered; } - - + + // track for processing outside of store index lock so we can dlq + final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>(); + private void duplicate(Message message) { + duplicatesFromStore.add(message); + } + + void dealWithDuplicates() { + for (Message message : duplicatesFromStore) { + regionDestination.duplicateFromStore(message, getSubscription()); + } + duplicatesFromStore.clear(); + } + public final synchronized void reset() { if (batchList.isEmpty()) { try { @@ -180,9 +190,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i lastCachedId = node.getMessageId(); lastTx = node.getMessage().getTransactionId(); } else { - // failed to recover, possible duplicate from concurrent dispatchPending, - // lets not recover further in case of out of order - disableCache = true; + LOG.debug(this + " duplicate add {}", node.getMessage(), new Throwable("duplicated detected")); + dealWithDuplicates(); } } } else { @@ -251,7 +260,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } protected final synchronized void fillBatch() { - LOG.trace("{} - fillBatch", this); + //LOG.trace("{} - fillBatch", this); if (batchResetNeeded) { resetSize(); setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size)); @@ -302,4 +311,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i protected abstract int getStoreSize(); protected abstract boolean isStoreEmpty(); + + public Subscription getSubscription() { + return null; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index b7fd289..c89b648 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -104,6 +104,7 @@ class QueueStorePrefetch extends AbstractStoreCursor { hadSpace = this.hasSpace(); if (!broker.getBrokerService().isPersistent() || hadSpace) { this.store.recoverNextMessages(this.maxBatchSize, this); + dealWithDuplicates(); // without the index lock } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index aec5c06..9e02e4e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -69,7 +69,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { @Override public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { - LOG.trace("recover: {}, priority: {}", message.getMessageId(), message.getPriority()); + LOG.trace("{} recover: {}, priority: {}", this, message.getMessageId(), message.getPriority()); boolean recovered = false; MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext(); messageEvaluationContext.setMessageReference(message); @@ -123,6 +123,11 @@ class TopicStorePrefetch extends AbstractStoreCursor { } @Override + public Subscription getSubscription() { + return subscription; + } + + @Override public String toString() { return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 8c8fe0f..2d2dd55 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1291,7 +1291,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } else { // If the message ID as indexed, then the broker asked us to // store a DUP message. Bad BOY! Don't do it, and log a warning. - LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId()); + LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); rollbackStatsOnDuplicate(command.getDestination()); http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java index 7781b7e..2896196 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java @@ -90,7 +90,7 @@ final class DataFileAccessor { return new ByteSequence(data, 0, data.length); } catch (RuntimeException e) { - throw new IOException("Invalid location: " + location + ", : " + e); + throw new IOException("Invalid location: " + location + ", : " + e, e); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java new file mode 100644 index 0000000..06f60e2 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java @@ -0,0 +1,475 @@ + +package org.apache.activemq.bugs; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.*; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.*; +import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.Wait; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import javax.jms.Connection; +import javax.sql.DataSource; +import scala.actors.threadpool.Arrays; +import java.net.URI; +import java.sql.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.*; + +/** + * Test creates a broker network with two brokers - + * producerBroker (with a message producer attached) and consumerBroker (with consumer attached) + * <p/> + * Simulates network duplicate message by stopping and restarting the consumerBroker after message (with message ID ending in + * 120) is persisted to consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the network connection. + * When the network connection is reestablished the producerBroker resends + * message (with messageID ending in 120). + * <p/> + * Expectation: + * <p/> + * With the following policy entries set, would expect the duplicate message to be read from the store + * and dispatched to the consumer - where the duplicate could be detected by consumer. + * <p/> + * PolicyEntry policy = new PolicyEntry(); + * policy.setQueue(">"); + * policy.setEnableAudit(false); + * policy.setUseCache(false); + * policy.setExpireMessagesPeriod(0); + * <p/> + * <p/> + * Note 1: Network needs to use replaywhenNoConsumers so enabling the networkAudit to avoid this scenario is not feasible. + * <p/> + * NOTE 2: Added a custom plugin to the consumerBroker so that the consumerBroker shutdown will occur after a message has been + * persisted to consumerBroker store but before an ACK is sent back to ProducerBroker. This is just a hack to ensure producerBroker will resend + * the message after shutdown. + */ + +public class AMQ4952Test extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class); + + protected static final int MESSAGE_COUNT = 1; + + protected BrokerService consumerBroker; + protected BrokerService producerBroker; + + protected ActiveMQQueue QUEUE_NAME = new ActiveMQQueue("duptest.store"); + + private final CountDownLatch stopConsumerBroker = new CountDownLatch(1); + private final CountDownLatch consumerBrokerRestarted = new CountDownLatch(1); + private final CountDownLatch consumerRestartedAndMessageForwarded = new CountDownLatch(1); + + private EmbeddedDataSource localDataSource; + + + public void testConsumerBrokerRestart() throws Exception { + + Callable consumeMessageTask = new Callable() { + @Override + public Object call() throws Exception { + + int receivedMessageCount = 0; + + ActiveMQConnectionFactory consumerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2006)?randomize=false&backup=false"); + Connection consumerConnection = consumerFactory.createConnection(); + + try { + + consumerConnection.setClientID("consumer"); + consumerConnection.start(); + + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME); + + + while (true) { + TextMessage textMsg = (TextMessage) messageConsumer.receive(5000); + + if (textMsg == null) { + return receivedMessageCount; + } + + receivedMessageCount++; + LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID()); + + // on first delivery ensure the message is pending an ack when it is resent from the producer broker + if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) { + LOG.info("Waiting for restart..."); + consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS); + } + + textMsg.acknowledge(); + + } + } finally { + consumerConnection.close(); + } + } + }; + + Runnable consumerBrokerResetTask = new Runnable() { + public void run() { + + try { + // wait for signal + stopConsumerBroker.await(); + + + LOG.info("********* STOPPING CONSUMER BROKER"); + + consumerBroker.stop(); + consumerBroker.waitUntilStopped(); + + + LOG.info("***** STARTING CONSUMER BROKER"); + // do not delete messages on startup + consumerBroker = createConsumerBroker(false); + + LOG.info("***** CONSUMER BROKER STARTED!!"); + consumerBrokerRestarted.countDown(); + + assertTrue("message forwarded on time", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("ProducerBroker totalMessageCount: " + producerBroker.getAdminView().getTotalMessageCount()); + return producerBroker.getAdminView().getTotalMessageCount() == 0; + } + })); + consumerRestartedAndMessageForwarded.countDown(); + + + } catch (Exception e) { + LOG.error("Exception when stopping/starting the consumerBroker ", e); + } + + + } + }; + + + ExecutorService executor = Executors.newFixedThreadPool(2); + + //start consumerBroker start/stop task + executor.execute(consumerBrokerResetTask); + + //start consuming messages + Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask); + + + produceMessages(); + + //Wait for consumer to finish + int totalMessagesConsumed = numberOfConsumedMessage.get(); + + StringBuffer contents = new StringBuffer(); + boolean messageInStore = isMessageInJDBCStore(localDataSource, contents); + LOG.debug("****number of messages received " + totalMessagesConsumed); + + assertEquals("number of messages received", 2, totalMessagesConsumed); + assertEquals("messages left in store", true, messageInStore); + assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ")); + + } + + private void produceMessages() throws JMSException { + + ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2003)?randomize=false&backup=false"); + Connection producerConnection = producerFactory.createConnection(); + + try { + producerConnection.setClientID("producer"); + producerConnection.start(); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final MessageProducer remoteProducer = producerSession.createProducer(QUEUE_NAME); + + int i = 0; + while (MESSAGE_COUNT > i) { + String payload = "test msg " + i; + TextMessage msg = producerSession.createTextMessage(payload); + remoteProducer.send(msg); + i++; + } + + } finally { + producerConnection.close(); + } + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + doSetUp(true); + } + + @Override + protected void tearDown() throws Exception { + doTearDown(); + super.tearDown(); + } + + protected void doTearDown() throws Exception { + + try { + consumerBroker.stop(); + } catch (Exception ex) { + } + try { + consumerBroker.stop(); + } catch (Exception ex) { + } + } + + protected void doSetUp(boolean deleteAllMessages) throws Exception { + producerBroker = createProducerBroker(); + consumerBroker = createConsumerBroker(true); + } + + + /** + * Producer broker + * listens on localhost:2003 + * networks to consumerBroker - localhost:2006 + * + * @return + * @throws Exception + */ + + protected BrokerService createProducerBroker() throws Exception { + + + String networkToPorts[] = new String[]{"2006"}; + HashMap<String, String> networkProps = new HashMap<String, String>(); + + networkProps.put("networkTTL", "10"); + networkProps.put("conduitSubscriptions", "true"); + networkProps.put("decreaseNetworkConsumerPriority", "true"); + networkProps.put("dynamicOnly", "true"); + + BrokerService broker = new BrokerService(); + broker.getManagementContext().setCreateConnector(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.setBrokerName("BP"); + broker.setAdvisorySupport(false); + + // lazy init listener on broker start + TransportConnector transportConnector = new TransportConnector(); + transportConnector.setUri(new URI("tcp://localhost:2003")); + List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>(); + transportConnectors.add(transportConnector); + broker.setTransportConnectors(transportConnectors); + + + //network to consumerBroker + + if (networkToPorts != null && networkToPorts.length > 0) { + StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false"); + NetworkConnector nc = broker.addNetworkConnector(builder.toString()); + if (networkProps != null) { + IntrospectionSupport.setProperties(nc, networkProps); + } + nc.setStaticallyIncludedDestinations(Arrays.asList(new ActiveMQQueue[]{QUEUE_NAME})); + } + + //Persistence adapter + + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + EmbeddedDataSource remoteDataSource = new EmbeddedDataSource(); + remoteDataSource.setDatabaseName("derbyDBRemoteBroker"); + remoteDataSource.setCreateDatabase("create"); + jdbc.setDataSource(remoteDataSource); + broker.setPersistenceAdapter(jdbc); + + //set Policy entries + PolicyEntry policy = new PolicyEntry(); + + policy.setQueue(">"); + policy.setEnableAudit(false); + policy.setUseCache(false); + policy.setExpireMessagesPeriod(0); + + // set replay with no consumers + ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = + new ConditionalNetworkBridgeFilterFactory(); + conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); + policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + + broker.start(); + broker.waitUntilStarted(); + + return broker; + } + + + /** + * consumerBroker + * - listens on localhost:2006 + * + * @param deleteMessages - drop messages when broker instance is created + * @return + * @throws Exception + */ + + protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception { + + String scheme = "tcp"; + String listenPort = "2006"; + + BrokerService broker = new BrokerService(); + broker.getManagementContext().setCreateConnector(false); + broker.setDeleteAllMessagesOnStartup(deleteMessages); + broker.setBrokerName("BC"); + // lazy init listener on broker start + TransportConnector transportConnector = new TransportConnector(); + transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort)); + List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>(); + transportConnectors.add(transportConnector); + broker.setTransportConnectors(transportConnectors); + + //policy entries + + PolicyEntry policy = new PolicyEntry(); + + policy.setQueue(">"); + policy.setUseCache(false); + policy.setExpireMessagesPeriod(0); + + // set replay with no consumers + ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = + new ConditionalNetworkBridgeFilterFactory(); + conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); + policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); + + PolicyMap pMap = new PolicyMap(); + + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + + + // Persistence adapter + JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter(); + EmbeddedDataSource localDataSource = new EmbeddedDataSource(); + localDataSource.setDatabaseName("derbyDBLocalBroker"); + localDataSource.setCreateDatabase("create"); + localJDBCPersistentAdapter.setDataSource(localDataSource); + broker.setPersistenceAdapter(localJDBCPersistentAdapter); + + if (deleteMessages) { + // no plugin on restart + broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()}); + } + + this.localDataSource = localDataSource; + + broker.start(); + broker.waitUntilStarted(); + + return broker; + } + + + /** + * Query JDBC Store to see if messages are left + * + * @param dataSource + * @return + * @throws SQLException + */ + + private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) + throws SQLException { + + boolean tableHasData = false; + String query = "select * from ACTIVEMQ_MSGS"; + + java.sql.Connection conn = dataSource.getConnection(); + PreparedStatement s = conn.prepareStatement(query); + + ResultSet set = null; + + + + try { + StringBuffer headers = new StringBuffer(); + set = s.executeQuery(); + ResultSetMetaData metaData = set.getMetaData(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + + if (i == 1) { + headers.append("||"); + } + headers.append(metaData.getColumnName(i) + "||"); + } + LOG.error(headers.toString()); + + + while (set.next()) { + tableHasData = true; + + for (int i = 1; i <= metaData.getColumnCount(); i++) { + if (i == 1) { + stringBuffer.append("|"); + } + stringBuffer.append(set.getString(i) + "|"); + } + LOG.error(stringBuffer.toString()); + } + } finally { + try { + set.close(); + } catch (Throwable ignore) { + } + try { + s.close(); + } catch (Throwable ignore) { + } + + conn.close(); + } + + return tableHasData; + } + + + /** + * plugin used to ensure consumerbroker is restared before the network message from producerBroker is acked + */ + class MyTestPlugin implements BrokerPlugin { + + public Broker installPlugin(Broker broker) throws Exception { + return new MyTestBroker(broker); + } + + } + + class MyTestBroker extends BrokerFilter { + + public MyTestBroker(Broker next) { + super(next); + } + + public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception { + + super.send(producerExchange, messageSend); + LOG.error("Stopping broker on send: " +messageSend.getMessageId().getProducerSequenceId()); + stopConsumerBroker.countDown(); + producerExchange.getConnectionContext().setDontSendReponse(true); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java index e7ee4d9..8c580a9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java @@ -35,19 +35,19 @@ import org.slf4j.LoggerFactory; public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport { private static final Logger LOG = LoggerFactory.getLogger(ConnectionPerMessageTest.class); - private static final int COUNT = 20000; + private static final int COUNT = 2000; protected String bindAddress; public void testConnectionPerMessage() throws Exception { final String topicName = "test.topic"; - LOG.info("Initializing pooled connection factory for JMS to URL: " + LOG.info("Initializing connection factory for JMS to URL: " + bindAddress); final ActiveMQConnectionFactory normalFactory = new ActiveMQConnectionFactory(); normalFactory.setBrokerURL(bindAddress); for (int i = 0; i < COUNT; i++) { - if (i % 1000 == 0) { + if (i % 100 == 0) { LOG.info(new Integer(i).toString()); } @@ -86,6 +86,7 @@ public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport { protected BrokerService createBroker() throws Exception { BrokerService answer = new BrokerService(); + answer.setDeleteAllMessagesOnStartup(true); answer.setUseJmx(false); answer.setPersistent(isPersistent()); answer.addConnector(bindAddress); http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java index 5bd9b8a..33963b7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java @@ -58,7 +58,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientB = createConsumer("BrokerB", dest); MessageConsumer clientC = createConsumer("BrokerC", dest); -// let consumers propogate around the network + //let consumers propagate around the network Thread.sleep(2000); // Send messages sendMessages("BrokerA", dest, MESSAGE_COUNT); @@ -143,7 +143,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientB = createConsumer("BrokerB", dest); MessageConsumer clientC = createConsumer("BrokerC", dest); -// let consumers propogate around the network + //let consumers propagate around the network Thread.sleep(2000); // Send messages sendMessages("BrokerA", dest, MESSAGE_COUNT); @@ -182,7 +182,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { MessageConsumer clientB = createConsumer("BrokerB", dest); MessageConsumer clientC = createConsumer("BrokerC", dest); -// let consumers propogate around the network + //let consumers propagate around the network Thread.sleep(2000); // Send messages @@ -254,7 +254,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { // default (true) is present in a matching destination policy entry int networkTTL = 2; boolean conduitSubs = true; - // Setup broker networks + // Setup ring broker networks bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs); bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduitSubs); bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL, conduitSubs); @@ -307,7 +307,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { public void testAllConnectedBrokerNetworkDurableSubTTL() throws Exception { int networkTTL = 2; boolean conduitSubs = true; - // Setup broker networks + // Setup ring broker network bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs); bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduitSubs); bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL, conduitSubs); @@ -396,6 +396,11 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport { createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options)); } + @Override + protected void configureBroker(BrokerService broker) { + broker.setBrokerId(broker.getBrokerName()); + } + public static Test suite() { return suite(ThreeBrokerTopicNetworkTest.class); }
