https://issues.apache.org/jira/browse/AMQ-4485 - resolve - non transacitonal work interleaved with transactional work needs to check for and integrate with cursor update ordering. Key difference in latest test is 3/4 of messages getting produced via network connectors, hense not in transactions
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dece1fce Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dece1fce Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dece1fce Branch: refs/heads/activemq-5.9 Commit: dece1fceceff41240236d352d64eb6413f05618d Parents: e179a72 Author: gtully <[email protected]> Authored: Mon Nov 18 13:04:44 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 12:09:39 2014 -0400 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 95 +++-- .../region/cursors/AbstractStoreCursor.java | 9 +- ...XBrokersWithNDestsFanoutTransactionTest.java | 353 +++++++++++++++++++ 3 files changed, 427 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/dece1fce/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 5b50f1c..31b60b8 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.ResourceAllocationException; +import javax.transaction.xa.XAException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -714,7 +715,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>(); - private volatile LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>(); + private LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>(); // roll up all message sends class SendSync extends Synchronization { @@ -742,40 +743,31 @@ public class Queue extends BaseDestination implements Task, UsageListener { @Override public void beforeCommit() throws Exception { - synchronized (sendLock) { + synchronized (orderIndexUpdates) { orderIndexUpdates.addLast(transaction); } } @Override public void afterCommit() throws Exception { - LinkedList<Transaction> orderedWork = new LinkedList<Transaction>();; - // use existing object to sync orderIndexUpdates that can be reassigned - synchronized (sendLock) { - Transaction next = orderIndexUpdates.peek(); - while( next!=null && next.isCommitted() ) { - orderedWork.addLast(orderIndexUpdates.removeFirst()); - next = orderIndexUpdates.peek(); - } - } - // do the ordered work - if (!orderedWork.isEmpty()) { - - ArrayList<SendSync> syncs = new ArrayList<SendSync>(orderedWork.size());; - for (Transaction tx : orderedWork) { - syncs.add(sendSyncs.remove(tx)); - } - sendLock.lockInterruptibly(); - try { - for (SendSync sync : syncs) { - sync.processSend(); + ArrayList<SendSync> syncs = new ArrayList<SendSync>(200); + sendLock.lockInterruptibly(); + try { + synchronized (orderIndexUpdates) { + Transaction next = orderIndexUpdates.peek(); + while( next!=null && next.isCommitted() ) { + syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst())); + next = orderIndexUpdates.peek(); } - } finally { - sendLock.unlock(); } for (SendSync sync : syncs) { - sync.processSent(); + sync.processSend(); } + } finally { + sendLock.unlock(); + } + for (SendSync sync : syncs) { + sync.processSent(); } } @@ -815,9 +807,51 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } + class OrderedNonTransactionWorkTx extends Transaction { + + @Override + public void commit(boolean onePhase) throws XAException, IOException { + } + + @Override + public void rollback() throws XAException, IOException { + } + + @Override + public int prepare() throws XAException, IOException { + return 0; + } + + @Override + public TransactionId getTransactionId() { + return null; + } + + @Override + public Logger getLog() { + return null; + } + + @Override + public boolean isCommitted() { + return true; + } + + @Override + public void addSynchronization(Synchronization s) { + try { + s.beforeCommit(); + } catch (Exception e) { + LOG.error("Failed to add not transactional message to orderedWork", e); + } + } + } + // called while holding the sendLock private void registerSendSync(Message message, ConnectionContext context) { - final Transaction transaction = context.getTransaction(); + final Transaction transaction = + message.isInTransaction() ? context.getTransaction() + : new OrderedNonTransactionWorkTx(); Queue.SendSync currentSync = sendSyncs.get(transaction); if (currentSync == null) { currentSync = new Queue.SendSync(transaction); @@ -831,6 +865,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { Exception { final ConnectionContext context = producerExchange.getConnectionContext(); Future<Object> result = null; + boolean needsOrderingWithTransactions = context.isInTransaction(); producerExchange.incrementSend(); checkUsage(context, producerExchange, message); @@ -847,7 +882,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { message.clearMarshalledState(); } } - if (context.isInTransaction()) { + // did a transaction commit beat us to the index? + synchronized (orderIndexUpdates) { + needsOrderingWithTransactions |= !orderIndexUpdates.isEmpty(); + } + if (needsOrderingWithTransactions ) { // If this is a transacted message.. increase the usage now so that // a big TX does not blow up // our memory. This increment is decremented once the tx finishes.. @@ -862,7 +901,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } finally { sendLock.unlock(); } - if (!context.isInTransaction()) { + if (!needsOrderingWithTransactions) { messageSent(context, message); } if (result != null && !result.isCancelled()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/dece1fce/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 624c4f3..495d365 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -21,6 +21,7 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.MessageRecoveryListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i private boolean storeHasMessages = false; protected int size; private MessageId lastCachedId; + private TransactionId lastTx; protected boolean hadSpace = false; protected AbstractStoreCursor(Destination destination) { @@ -176,6 +178,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i if (isCacheEnabled()) { if (recoverMessage(node.getMessage(),true)) { lastCachedId = node.getMessageId(); + lastTx = node.getMessage().getTransactionId(); } else { // failed to recover, possible duplicate from concurrent dispatchPending, // lets not recover further in case of out of order @@ -190,9 +193,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i setCacheEnabled(false); // sync with store on disabling the cache if (lastCachedId != null) { - LOG.trace(this + "{} - disabling cache, lastCachedId: {} current node Id: {} batchList size: {}", new Object[]{ this, lastCachedId, node.getMessageId(), batchList.size() }); + LOG.debug("{} - disabling cache, lastCachedId: {} last-tx: {} current node Id: {} node-tx: {} batchList size: {}", + new Object[]{ this, lastCachedId, lastTx, node.getMessageId(), node.getMessage().getTransactionId(), batchList.size() }); setBatch(lastCachedId); lastCachedId = null; + lastTx = null; } } this.storeHasMessages = true; @@ -287,7 +292,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public String toString() { return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() - + ",maxBatchSize:" + maxBatchSize; + + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace(); } protected abstract void doFillBatch() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq/blob/dece1fce/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java new file mode 100644 index 0000000..c2cf53a --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java @@ -0,0 +1,353 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.QueueConnection; +import javax.jms.QueueReceiver; +import javax.jms.QueueSession; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.TimeUtils; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends JmsMultipleBrokersTestSupport { + static final String payload = new String(new byte[10 * 1024]); + private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class); + final int portBase = 61600; + final int numBrokers = 4; + final int numProducers = 10; + final int numMessages = 800; + final int consumerSleepTime = 20; + StringBuilder brokersUrl = new StringBuilder(); + HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>(); + private ArrayList<Throwable> exceptions = new ArrayList<Throwable>(); + + protected void buildUrlList() throws Exception { + for (int i = 0; i < numBrokers; i++) { + brokersUrl.append("tcp://localhost:" + (portBase + i)); + if (i != numBrokers - 1) { + brokersUrl.append(','); + } + } + } + + protected BrokerService createBroker(int brokerid) throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.getManagementContext().setCreateConnector(false); + + + broker.setUseJmx(true); + broker.setBrokerName("B" + brokerid); + broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid))); + + addNetworkConnector(broker); + broker.setSchedulePeriodForDestinationPurge(0); + broker.getSystemUsage().setSendFailIfNoSpace(true); + broker.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024); + + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setExpireMessagesPeriod(0); + policyEntry.setQueuePrefetch(1000); + policyEntry.setMemoryLimit(1024 * 1024l); + policyEntry.setOptimizedDispatch(false); + policyEntry.setProducerFlowControl(false); + policyEntry.setEnableAudit(true); + policyEntry.setUseCache(true); + policyMap.put(new ActiveMQQueue("GW.>"), policyEntry); + broker.setDestinationPolicy(policyMap); + + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false); + + brokers.put(broker.getBrokerName(), new BrokerItem(broker)); + return broker; + } + + private void addNetworkConnector(BrokerService broker) throws Exception { + StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString()); + networkConnectorUrl.append(')'); + + for (int i = 0; i < 2; i++) { + NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString())); + nc.setName("Bridge-" + i); + nc.setNetworkTTL(1); + nc.setDecreaseNetworkConsumerPriority(true); + nc.setDynamicOnly(true); + nc.setPrefetchSize(100); + nc.setDynamicallyIncludedDestinations( + Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")})); + broker.addNetworkConnector(nc); + } + } + + public void testBrokers() throws Exception { + + buildUrlList(); + + for (int i = 0; i < numBrokers; i++) { + createBroker(i); + } + + startAllBrokers(); + waitForBridgeFormation(numBrokers - 1); + + verifyPeerBrokerInfos(numBrokers - 1); + + + final List<ConsumerState> consumerStates = startAllGWConsumers(numBrokers); + + startAllGWFanoutConsumers(numBrokers); + + LOG.info("Waiting for percolation of consumers.."); + TimeUnit.SECONDS.sleep(5); + + LOG.info("Produce mesages.."); + long startTime = System.currentTimeMillis(); + + // produce + produce(numMessages); + + assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + for (ConsumerState tally : consumerStates) { + final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1); + LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get()); + if (tally.accumulator.get() != expected) { + LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected); + return false; + } + LOG.info("got tally on " + tally.brokerName); + } + return true; + } + }, 1000 * 60 * 1000l)); + + assertTrue("No exceptions:" + exceptions, exceptions.isEmpty()); + + LOG.info("done"); + long duration = System.currentTimeMillis() - startTime; + LOG.info("Duration:" + TimeUtils.printDuration(duration)); + } + + private void startAllGWFanoutConsumers(int nBrokers) throws Exception { + + StringBuffer compositeDest = new StringBuffer(); + for (int k = 0; k < nBrokers; k++) { + compositeDest.append("GW." + k); + if (k + 1 != nBrokers) { + compositeDest.append(','); + } + } + ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString()); + + for (int id = 0; id < nBrokers; id++) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + + final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED); + + final MessageProducer producer = queueSession.createProducer(compositeQ); + queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + producer.send(message); + queueSession.commit(); + } catch (Exception e) { + LOG.error("Failed to fanout to GW: " + message, e); + } + + } + }); + } + } + + private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception { + List<ConsumerState> consumerStates = new LinkedList<ConsumerState>(); + for (int id = 0; id < nBrokers; id++) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + + final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue destination = new ActiveMQQueue("GW." + id); + QueueReceiver queueReceiver = queueSession.createReceiver(destination); + + final ConsumerState consumerState = new ConsumerState(); + consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName(); + consumerState.receiver = queueReceiver; + consumerState.destination = destination; + for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) { + consumerState.expected.add(j); + } + + if (!accumulators.containsKey(destination)) { + accumulators.put(destination, new AtomicInteger(0)); + } + consumerState.accumulator = accumulators.get(destination); + + queueReceiver.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + if (consumerSleepTime > 0) { + TimeUnit.MILLISECONDS.sleep(consumerSleepTime); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + consumerState.accumulator.incrementAndGet(); + try { + consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM")); + } catch (IOException e) { + e.printStackTrace(); + } + } catch (Exception e) { + LOG.error("Failed to commit slow receipt of " + message, e); + } + } + }); + + consumerStates.add(consumerState); + + } + return consumerStates; + } + + private void produce(int numMessages) throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(numProducers); + final AtomicInteger toSend = new AtomicInteger(numMessages); + for (int i = 1; i <= numProducers; i++) { + final int id = i % numBrokers; + executorService.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")"); + connectionFactory.setWatchTopicAdvisories(false); + QueueConnection queueConnection = connectionFactory.createQueueConnection(); + queueConnection.start(); + QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = queueSession.createProducer(null); + int val = 0; + while ((val = toSend.decrementAndGet()) >= 0) { + + ActiveMQQueue compositeQ = new ActiveMQQueue("IN"); + LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ); + Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + val + " payload:" + payload); + textMessage.setIntProperty("NUM", val); + producer.send(compositeQ, textMessage); + } + queueConnection.close(); + + } catch (Throwable throwable) { + throwable.printStackTrace(); + exceptions.add(throwable); + } + } + }); + } + } + + private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception { + final BrokerService broker = brokerItem.broker; + final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker(); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + return max == regionBroker.getPeerBrokerInfos().length; + } + }); + LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length); + List<String> missing = new ArrayList<String>(); + for (int i = 0; i < max; i++) { + missing.add("B" + i); + } + if (max != regionBroker.getPeerBrokerInfos().length) { + for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) { + LOG.info(info.getBrokerName()); + missing.remove(info.getBrokerName()); + } + LOG.info("Broker infos off.." + missing); + } + assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length); + } + + private void verifyPeerBrokerInfos(final int max) throws Exception { + Collection<BrokerItem> brokerList = brokers.values(); + for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) { + verifyPeerBrokerInfo(i.next(), max); + } + } + + protected void tearDown() throws Exception { + super.tearDown(); + } + + class ConsumerState { + AtomicInteger accumulator; + String brokerName; + QueueReceiver receiver; + ActiveMQDestination destination; + Vector<Integer> expected = new Vector<Integer>(); + } +}
