Repository: activemq Updated Branches: refs/heads/trunk 11185c205 -> 6348d1197
https://issues.apache.org/jira/browse/AMQ-5266 - fix ordering of concurrent transaction completion in jdbc store, avoid skipped message dispatch. additional test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6348d119 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6348d119 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6348d119 Branch: refs/heads/trunk Commit: 6348d11976fd16e74e0b36b5e76e8ba2bf48db4a Parents: 11185c2 Author: gtully <[email protected]> Authored: Thu Jul 10 12:32:40 2014 +0100 Committer: gtully <[email protected]> Committed: Thu Jul 10 12:33:29 2014 +0100 ---------------------------------------------------------------------- .../apache/activemq/store/jdbc/JDBCAdapter.java | 2 +- .../activemq/store/jdbc/JDBCMessageStore.java | 48 +- .../store/jdbc/JDBCPersistenceAdapter.java | 4 +- .../store/jdbc/JDBCTopicMessageStore.java | 4 +- .../store/jdbc/JdbcMemoryTransactionStore.java | 4 +- .../apache/activemq/store/jdbc/Statements.java | 4 +- .../activemq/store/jdbc/TransactionContext.java | 11 +- .../store/jdbc/adapter/DefaultJDBCAdapter.java | 7 +- .../org/apache/activemq/bugs/AMQ5266Test.java | 539 +++++++++++++++++++ 9 files changed, 600 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 912808d..1d368b6 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -87,7 +87,7 @@ public interface JDBCAdapter { int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException; - void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception; + void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long nextSeq, long priority, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception; long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException; http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 968b928..4badb09 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.sql.SQLException; import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -70,6 +72,7 @@ public class JDBCMessageStore extends AbstractMessageStore { protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); final Set<Long> recoveredAdditions = new LinkedHashSet<Long>(); protected ActiveMQMessageAudit audit; + protected final List<Long> pendingAdditions = new LinkedList<Long>(); public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { super(destination); @@ -108,9 +111,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } return; } - - long sequenceId = persistenceAdapter.getNextSequenceId(); - + // Serialize the Message.. byte data[]; try { @@ -122,7 +123,14 @@ public class JDBCMessageStore extends AbstractMessageStore { // Get a connection and insert the message into the DB. TransactionContext c = persistenceAdapter.getTransactionContext(context); - try { + long sequenceId; + synchronized (pendingAdditions) { + sequenceId = persistenceAdapter.getNextSequenceId(); + if (message.isInTransaction()) { + trackPendingSequence(c, sequenceId); + } + } + try { adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(), this.isPrioritizedMessages() ? message.getPriority() : 0, context != null ? context.getXid() : null); } catch (SQLException e) { @@ -132,7 +140,28 @@ public class JDBCMessageStore extends AbstractMessageStore { c.close(); } message.getMessageId().setEntryLocator(sequenceId); - onAdd(messageId, sequenceId, message.getPriority()); + onAdd(message, sequenceId, message.getPriority()); + } + + // jdbc commit order is random with concurrent connections - limit scan to lowest pending + private void trackPendingSequence(final TransactionContext transactionContext, final long sequenceId) { + synchronized (pendingAdditions) { pendingAdditions.add(sequenceId); } + transactionContext.onCompletion(new Runnable() { + public void run() { + synchronized (pendingAdditions) { pendingAdditions.remove(sequenceId); } + } + }); + } + + private long minPendingSequeunceId() { + synchronized (pendingAdditions) { + if (!pendingAdditions.isEmpty()) { + return pendingAdditions.get(0); + } else { + // nothing pending, ensure scan is limited to current state + return persistenceAdapter.sequenceGenerator.getLastSequenceId() + 1; + } + } } @Override @@ -148,8 +177,9 @@ public class JDBCMessageStore extends AbstractMessageStore { } } - protected void onAdd(MessageId messageId, long sequenceId, byte priority) { - if (lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) { + protected void onAdd(Message message, long sequenceId, byte priority) { + if (message.getTransactionId() != null && message.getTransactionId().isXATransaction() + && lastRecoveredSequenceId.get() > 0 && sequenceId < lastRecoveredSequenceId.get()) { recoveredAdditions.add(sequenceId); } } @@ -232,7 +262,7 @@ public class JDBCMessageStore extends AbstractMessageStore { c = persistenceAdapter.getTransactionContext(); adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { - Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); + Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); return listener.recoverMessage(msg); } @@ -303,7 +333,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } } } - adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), + adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index a3a8250..b4fb5d5 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -744,9 +744,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements } public long getNextSequenceId() { - synchronized(sequenceGenerator) { - return sequenceGenerator.getNextSequenceId(); - } + return sequenceGenerator.getNextSequenceId(); } public int getMaxRows() { http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index 1efdc63..1841f11 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -316,14 +316,14 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName)); } - protected void onAdd(MessageId messageId, long sequenceId, byte priority) { + protected void onAdd(Message message, long sequenceId, byte priority) { // update last recovered state for (LastRecovered last : subscriberLastRecoveredMap.values()) { last.updateStored(sequenceId, priority); } sequenceIdCacheSizeLock.writeLock().lock(); try { - sequenceIdCache.put(messageId, new long[]{sequenceId, priority}); + sequenceIdCache.put(message.getMessageId(), new long[]{sequenceId, priority}); } finally { sequenceIdCacheSizeLock.writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index 135528e..4128eef 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -107,7 +107,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { Message message = addMessageCommand.getMessage(); jdbcPersistenceAdapter.commitAdd(context, message.getMessageId()); ((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd( - message.getMessageId(), + message, (Long)message.getMessageId().getEntryLocator(), message.getPriority()); @@ -187,7 +187,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { @Override public void run(ConnectionContext context) throws IOException { ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId()); - ((JDBCMessageStore)messageStore).onAdd(message.getMessageId(), ((Long)message.getMessageId().getEntryLocator()).longValue(), message.getPriority()); + ((JDBCMessageStore)messageStore).onAdd(message, ((Long)message.getMessageId().getEntryLocator()).longValue(), message.getPriority()); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java index fc80465..a595f33 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -498,7 +498,7 @@ public class Statements { public String getFindNextMessagesStatement() { if (findNextMessagesStatement == null) { findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() - + " WHERE CONTAINER=? AND ID > ? AND XID IS NULL ORDER BY ID"; + + " WHERE CONTAINER=? AND ID > ? AND ID < ? AND XID IS NULL ORDER BY ID"; } return findNextMessagesStatement; } @@ -511,7 +511,7 @@ public class Statements { findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() + " WHERE CONTAINER=?" + " AND XID IS NULL" - + " AND ((ID > ? AND PRIORITY = ?) OR PRIORITY < ?)" + + " AND ((ID > ? AND ID < ? AND PRIORITY = ?) OR PRIORITY < ?)" + " ORDER BY PRIORITY DESC, ID"; } return findNextMessagesByPriorityStatement; http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java index 8b4ac97..5e5d556 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java @@ -21,6 +21,8 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.util.LinkedList; +import java.util.List; import javax.sql.DataSource; @@ -44,7 +46,8 @@ public class TransactionContext { private PreparedStatement updateLastAckStatement; // a cheap dirty level that we can live with private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED; - + private LinkedList<Runnable> completions = new LinkedList<Runnable>(); + public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException { this.persistenceAdapter = persistenceAdapter; this.dataSource = persistenceAdapter.getDataSource(); @@ -154,6 +157,9 @@ public class TransactionContext { } finally { connection = null; } + for (Runnable completion: completions) { + completion.run(); + } } } } @@ -248,4 +254,7 @@ public class TransactionContext { this.transactionIsolation = transactionIsolation; } + public void onCompletion(Runnable runnable) { + completions.add(runnable); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index dc59621..7a85de3 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -1077,7 +1077,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { return result; } - public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, + public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long lastRecoveredSeq, long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null; ResultSet rs = null; @@ -1090,10 +1090,11 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } s.setMaxRows(Math.min(maxReturned * 2, maxRows)); s.setString(1, destination.getQualifiedName()); - s.setLong(2, nextSeq); + s.setLong(2, lastRecoveredSeq); + s.setLong(3, maxSeq); if (isPrioritizedMessages) { - s.setLong(3, priority); s.setLong(4, priority); + s.setLong(5, priority); } rs = s.executeQuery(); int count = 0; http://git-wip-us.apache.org/repos/asf/activemq/blob/6348d119/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java new file mode 100644 index 0000000..cc2bb3c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java @@ -0,0 +1,539 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +/** + * Stuck messages test client. + * <p/> + * Will kick of publisher and consumer simultaneously, and will usually result in stuck messages on the queue. + */ +public class AMQ5266Test { + static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class); + String activemqURL = "tcp://localhost:61617"; + BrokerService brokerService; + private EmbeddedDataSource dataSource; + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + + dataSource = new EmbeddedDataSource(); + dataSource.setDatabaseName("target/derbyDb"); + dataSource.setCreateDatabase("create"); + + JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(); + persistenceAdapter.setDataSource(dataSource); + brokerService.setPersistenceAdapter(persistenceAdapter); + brokerService.setDeleteAllMessagesOnStartup(true); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setEnableAudit(false); + defaultEntry.setUseCache(false); + defaultEntry.setMaxPageSize(1000); + defaultEntry.setOptimizedDispatch(false); + defaultEntry.setMemoryLimit(1024 * 1024); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + + brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); + + TransportConnector transportConnector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + activemqURL = transportConnector.getPublishableConnectString(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + } + try { + dataSource.setShutdownDatabase("shutdown"); + dataSource.getConnection(); + } catch (Exception ignored) {} + } + + @Test + public void test() throws Exception { + + String activemqQueues = "activemq,activemq2";//,activemq3,activemq4,activemq5,activemq6,activemq7,activemq8,activemq9"; + + int publisherMessagesPerThread = 1000; + int publisherThreadCount = 5; + + int consumerThreadsPerQueue = 5; + int consumerBatchSize = 25; + int consumerWaitForConsumption = 5 * 60 * 1000; + + ExportQueuePublisher publisher = null; + ExportQueueConsumer consumer = null; + + LOG.info("Publisher will publish " + (publisherMessagesPerThread * publisherThreadCount) + " messages to each queue specified."); + LOG.info("\nBuilding Publisher..."); + + publisher = new ExportQueuePublisher(activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount); + + LOG.info("Building Consumer..."); + + consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount); + + + LOG.info("Starting Publisher..."); + + publisher.start(); + + LOG.info("Starting Consumer..."); + + consumer.start(); + + int distinctPublishedCount = 0; + + + LOG.info("Waiting For Publisher Completion..."); + + publisher.waitForCompletion(); + + distinctPublishedCount = publisher.getIDs().size(); + + LOG.info("Publisher Complete. Distinct IDs Published: " + distinctPublishedCount); + + + long endWait = System.currentTimeMillis() + consumerWaitForConsumption; + + + while (!consumer.completed() && System.currentTimeMillis() < endWait) { + try { + int secs = (int) (endWait - System.currentTimeMillis()) / 1000; + LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs"); + DefaultJDBCAdapter.dumpTables(dataSource.getConnection()); + Thread.sleep(10000); + } catch (Exception e) { + } + } + + LOG.info("\nConsumer Complete. Shutting Down."); + + consumer.shutdown(); + + LOG.info("Consumer Stats:"); + + for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) { + + List<String> idList = entry.getValue(); + + int distinctConsumed = new TreeSet<String>(idList).size(); + + StringBuilder sb = new StringBuilder(); + sb.append(" Queue: " + entry.getKey() + + " -> Total Messages Consumed: " + idList.size() + + ", Distinct IDs Consumed: " + distinctConsumed); + + int diff = distinctPublishedCount - distinctConsumed; + sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + " ) "); + LOG.info(sb.toString()); + + assertEquals("expect to get all messages!", 0, diff); + + } + } + + public class ExportQueuePublisher { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + // Collection of distinct IDs that the publisher has published. + // After a message is published, its UUID will be written to this list for tracking. + // This list of IDs (or distinct count) will be used to compare to the consumed list of IDs. + private Set<String> ids = Collections.synchronizedSet(new TreeSet<String>()); + private List<PublisherThread> threads; + + public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + + threads = new ArrayList<PublisherThread>(); + + // Build the threads and tell them how many messages to publish + for (int i = 0; i < threadCount; i++) { + PublisherThread pt = new PublisherThread(messagesPerThread); + threads.add(pt); + } + } + + public Set<String> getIDs() { + return ids; + } + + // Kick off threads + public void start() throws Exception { + + for (PublisherThread pt : threads) { + pt.start(); + } + } + + // Wait for threads to complete. They will complete once they've published all of their messages. + public void waitForCompletion() throws Exception { + + for (PublisherThread pt : threads) { + pt.join(); + pt.close(); + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + private QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + private class PublisherThread extends Thread { + + private int count; + private QueueConnection qc; + private Session session; + private MessageProducer mp; + + private PublisherThread(int count) throws Exception { + + this.count = count; + + // Each Thread has its own Connection and Session, so no sync worries + qc = newQueueConnection(); + session = newSession(qc); + + // In our code, when publishing to multiple queues, + // we're using composite destinations like below + Queue q = new ActiveMQQueue(activemqQueues); + mp = session.createProducer(q); + } + + public void run() { + + try { + + // Loop until we've published enough messages + while (count-- > 0) { + + TextMessage tm = session.createTextMessage("test"); + String id = UUID.randomUUID().toString(); + tm.setStringProperty("KEY", id); + ids.add(id); // keep track of the key to compare against consumer + + mp.send(tm); + session.commit(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + // Called by waitForCompletion + public void close() { + + try { + mp.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + } + } + } + + } + + public class ExportQueueConsumer { + + private final String amqUser = ActiveMQConnection.DEFAULT_USER; + private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD; + private final int totalToExpect; + private ActiveMQConnectionFactory connectionFactory = null; + private String activemqURL = null; + private String activemqQueues = null; + private String[] queues = null; + // Map of IDs that were consumed, keyed by queue name. + // We'll compare these against what was published to know if any got stuck or dropped. + private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>(); + private Map<String, List<ConsumerThread>> threads; + + public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception { + + this.activemqURL = activemqURL; + this.activemqQueues = activemqQueues; + this.totalToExpect = totalToExpect; + + queues = this.activemqQueues.split(","); + + for (int i = 0; i < queues.length; i++) { + queues[i] = queues[i].trim(); + } + + threads = new HashMap<String, List<ConsumerThread>>(); + + // For each queue, create a list of threads and set up the list of ids + for (String q : queues) { + + List<ConsumerThread> list = new ArrayList<ConsumerThread>(); + + idsByQueue.put(q, Collections.synchronizedList(new ArrayList<String>())); + + for (int i = 0; i < threadsPerQueue; i++) { + list.add(new ConsumerThread(q, batchSize)); + } + + threads.put(q, list); + } + } + + public Map<String, List<String>> getIDs() { + return idsByQueue; + } + + // Start the threads + public void start() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.start(); + } + } + } + + // Tell the threads to stop + // Then wait for them to stop + public void shutdown() throws Exception { + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.shutdown(); + } + } + + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + ct.join(); + } + } + } + + private Session newSession(QueueConnection queueConnection) throws Exception { + return queueConnection.createSession(true, Session.SESSION_TRANSACTED); + } + + private QueueConnection newQueueConnection() throws Exception { + + if (connectionFactory == null) { + connectionFactory = new ActiveMQConnectionFactory(amqUser, amqPassword, activemqURL); + } + + // Set the redelivery count to -1 (infinite), or else messages will start dropping + // after the queue has had a certain number of failures (default is 6) + RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy(); + policy.setMaximumRedeliveries(-1); + + QueueConnection amqConnection = connectionFactory.createQueueConnection(); + amqConnection.start(); + return amqConnection; + } + + public boolean completed() { + for (List<ConsumerThread> list : threads.values()) { + + for (ConsumerThread ct : list) { + + if (ct.isAlive()) { + LOG.info("thread for {} is still alive.", ct.qName); + return false; + } + } + } + return true; + } + + private class ConsumerThread extends Thread { + + private int batchSize; + private QueueConnection qc; + private Session session; + private MessageConsumer mc; + private List<String> idList; + private boolean shutdown = false; + private String qName; + + private ConsumerThread(String queueName, int batchSize) throws Exception { + + this.batchSize = batchSize; + + // Each thread has its own connection and session + qName = queueName; + qc = newQueueConnection(); + session = newSession(qc); + Queue q = session.createQueue(queueName); + mc = session.createConsumer(q); + + idList = idsByQueue.get(queueName); + } + + public void run() { + + try { + + int count = 0; + + // Keep reading as long as it hasn't been told to shutdown + while (!shutdown) { + + if (idList.size() >= totalToExpect) { + LOG.info("Got {} for q: {}", +idList.size(), qName); + break; + } + Message m = mc.receive(4000); + + if (m != null) { + + // We received a non-null message, add the ID to our list + + idList.add(m.getStringProperty("KEY")); + + count++; + + // If we've reached our batch size, commit the batch and reset the count + + if (count == batchSize) { + session.commit(); + count = 0; + } + } else { + + // We didn't receive anything this time, commit any current batch and reset the count + + session.commit(); + count = 0; + + // Sleep a little before trying to read after not getting a message + + try { + LOG.info("did not receive on {}, current count: {}", qName, idList.size()); + //sleep(3000); + } catch (Exception e) { + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + + // Once we exit, close everything + close(); + } + } + + public void shutdown() { + shutdown = true; + } + + public void close() { + + try { + mc.close(); + } catch (Exception e) { + } + + try { + session.close(); + } catch (Exception e) { + } + + try { + qc.close(); + } catch (Exception e) { + + } + } + } + } +}
