rework cursor store sync w.r.t to index order. resolve issues with skipped dispatch and duplicate dispatch. https://issues.apache.org/jira/browse/AMQ-4485 https://issues.apache.org/jira/browse/AMQ-5266
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/54e2e3be Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/54e2e3be Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/54e2e3be Branch: refs/heads/trunk Commit: 54e2e3bef290d7455d9d1ba3420d12dc4805b339 Parents: b2afb8c Author: gtully <gary.tu...@gmail.com> Authored: Fri Aug 29 22:24:38 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Sat Aug 30 00:51:21 2014 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 212 +++------ .../activemq/broker/region/RegionBroker.java | 1 + .../cursors/AbstractPendingMessageCursor.java | 6 +- .../region/cursors/AbstractStoreCursor.java | 101 ++-- .../cursors/FilePendingMessageCursor.java | 4 +- .../region/cursors/PendingMessageCursor.java | 4 +- .../region/cursors/QueueStorePrefetch.java | 1 + .../cursors/StoreDurableSubscriberCursor.java | 3 +- .../broker/region/cursors/StoreQueueCursor.java | 6 +- .../region/cursors/VMPendingMessageCursor.java | 4 +- .../activemq/store/AbstractMessageStore.java | 7 + .../apache/activemq/store/IndexListener.java | 47 ++ .../org/apache/activemq/store/MessageStore.java | 2 + .../activemq/store/ProxyMessageStore.java | 5 + .../activemq/store/ProxyTopicMessageStore.java | 5 + .../store/memory/MemoryMessageStore.java | 6 + .../org/apache/activemq/command/MessageId.java | 10 + .../activemq/store/jdbc/JDBCMessageStore.java | 47 +- .../store/jdbc/JDBCPersistenceAdapter.java | 4 +- .../store/jdbc/JdbcMemoryTransactionStore.java | 8 +- .../store/jdbc/adapter/DefaultJDBCAdapter.java | 12 +- .../activemq/store/kahadb/KahaDBStore.java | 82 +++- .../store/kahadb/KahaDBTransactionStore.java | 15 +- .../activemq/store/kahadb/MessageDatabase.java | 190 +++++--- .../store/kahadb/disk/index/BTreeIndex.java | 6 +- .../store/kahadb/disk/index/BTreeNode.java | 19 +- .../store/kahadb/disk/index/BTreeIndexTest.java | 29 ++ .../org/apache/activemq/leveldb/DBManager.scala | 6 + .../apache/activemq/leveldb/LevelDBStore.scala | 35 +- .../StoreQueueCursorNoDuplicateTest.java | 1 + .../activemq/bugs/AMQ4485LowLimitTest.java | 462 +++++++++++++++++++ .../org/apache/activemq/bugs/AMQ5212Test.java | 21 + .../org/apache/activemq/bugs/AMQ5266Test.java | 134 +++++- .../network/DemandForwardingBridgeTest.java | 1 - .../activemq/usecases/MemoryLimitTest.java | 6 +- 35 files changed, 1152 insertions(+), 350 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index af7cfe9..d1605e2 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -30,13 +30,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -47,7 +45,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.ResourceAllocationException; -import javax.transaction.xa.XAException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -75,12 +72,12 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; -import org.apache.activemq.command.TransactionId; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.state.ProducerState; +import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.ListenableFuture; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; @@ -88,7 +85,6 @@ import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.transaction.Transaction; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; import org.apache.activemq.util.BrokerSupport; @@ -101,7 +97,7 @@ import org.slf4j.MDC; * The Queue is a List of MessageEntry objects that are dispatched to matching * subscriptions. */ -public class Queue extends BaseDestination implements Task, UsageListener { +public class Queue extends BaseDestination implements Task, UsageListener, IndexListener { protected static final Logger LOG = LoggerFactory.getLogger(Queue.class); protected final TaskRunnerFactory taskFactory; protected TaskRunner taskRunner; @@ -241,6 +237,9 @@ public class Queue extends BaseDestination implements Task, UsageListener { super(brokerService, store, destination, parentStats); this.taskFactory = taskFactory; this.dispatchSelector = new QueueDispatchSelector(destination); + if (store != null) { + store.registerIndexListener(this); + } } @Override @@ -746,158 +745,81 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>(); - private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>(); - - // roll up all message sends - class SendSync extends Synchronization { - - class MessageContext { - public Message message; - public ConnectionContext context; - - public MessageContext(ConnectionContext context, Message message) { - this.context = context; - this.message = message; - } - } - - final Transaction transaction; - List<MessageContext> additions = new ArrayList<MessageContext>(); - - public SendSync(Transaction transaction) { - this.transaction = transaction; - } - - public void add(ConnectionContext context, Message message) { - additions.add(new MessageContext(context, message)); - } + private final LinkedList<MessageContext> indexOrderedCursorUpdates = new LinkedList<>(); - @Override - public void beforeCommit() throws Exception { - synchronized (orderIndexUpdates) { - orderIndexUpdates.addLast(transaction); - } + @Override + public void onAdd(MessageContext messageContext) { + synchronized (indexOrderedCursorUpdates) { + indexOrderedCursorUpdates.addLast(messageContext); } + } - @Override - public void afterCommit() throws Exception { - ArrayList<SendSync> syncs = new ArrayList<SendSync>(200); - sendLock.lockInterruptibly(); - try { - synchronized (orderIndexUpdates) { - Transaction next = orderIndexUpdates.peek(); - while( next!=null && next.isCommitted() ) { - syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst())); - next = orderIndexUpdates.peek(); + private void doPendingCursorAdditions() throws Exception { + LinkedList<MessageContext> orderedUpdates = new LinkedList<>(); + sendLock.lockInterruptibly(); + try { + synchronized (indexOrderedCursorUpdates) { + MessageContext candidate = indexOrderedCursorUpdates.peek(); + while (candidate != null && candidate.message.getMessageId().getFutureOrSequenceLong() != null) { + candidate = indexOrderedCursorUpdates.removeFirst(); + // check for duplicate adds suppressed by the store + if (candidate.message.getMessageId().getFutureOrSequenceLong() instanceof Long && ((Long)candidate.message.getMessageId().getFutureOrSequenceLong()).compareTo(-1l) == 0) { + LOG.warn("{} messageStore indicated duplicate add attempt for {}, suppressing duplicate dispatch", this, candidate.message.getMessageId()); + } else { + orderedUpdates.add(candidate); } + candidate = indexOrderedCursorUpdates.peek(); } - for (SendSync sync : syncs) { - sync.processSend(); - } - } finally { - sendLock.unlock(); - } - for (SendSync sync : syncs) { - sync.processSent(); } - } - - // called with sendLock - private void processSend() throws Exception { - - for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) { - MessageContext messageContext = iterator.next(); - // It could take while before we receive the commit - // op, by that time the message could have expired.. - if (broker.isExpired(messageContext.message)) { - broker.messageExpired(messageContext.context, messageContext.message, null); - destinationStatistics.getExpired().increment(); - iterator.remove(); - continue; + for (MessageContext messageContext : orderedUpdates) { + if (!cursorAdd(messageContext.message)) { + // cursor suppressed a duplicate + messageContext.duplicate = true; } - sendMessage(messageContext.message); - messageContext.message.decrementReferenceCount(); } + } finally { + sendLock.unlock(); } - - private void processSent() throws Exception { - for (MessageContext messageContext : additions) { + for (MessageContext messageContext : orderedUpdates) { + if (!messageContext.duplicate) { messageSent(messageContext.context, messageContext.message); } - } - - @Override - public void afterRollback() throws Exception { - try { - for (MessageContext messageContext : additions) { - messageContext.message.decrementReferenceCount(); - } - } finally { - sendSyncs.remove(transaction); + if (messageContext.onCompletion != null) { + messageContext.onCompletion.run(); } } + orderedUpdates.clear(); } - class OrderedNonTransactionWorkTx extends Transaction { - - @Override - public void commit(boolean onePhase) throws XAException, IOException { - } - - @Override - public void rollback() throws XAException, IOException { - } - - @Override - public int prepare() throws XAException, IOException { - return 0; - } + final class CursorAddSync extends Synchronization { - @Override - public TransactionId getTransactionId() { - return null; - } + private final MessageContext messageContext; - @Override - public Logger getLog() { - return null; + CursorAddSync(MessageContext messageContext) { + this.messageContext = messageContext; } @Override - public boolean isCommitted() { - return true; - } - - @Override - public void addSynchronization(Synchronization s) { - try { - s.beforeCommit(); - } catch (Exception e) { - LOG.error("Failed to add not transactional message to orderedWork", e); + public void afterCommit() throws Exception { + if (store != null && messageContext.message.isPersistent()) { + doPendingCursorAdditions(); + } else { + cursorAdd(messageContext.message); + messageSent(messageContext.context, messageContext.message); } + messageContext.message.decrementReferenceCount(); } - } - // called while holding the sendLock - private void registerSendSync(Message message, ConnectionContext context) { - final Transaction transaction = - message.isInTransaction() ? context.getTransaction() - : new OrderedNonTransactionWorkTx(); - Queue.SendSync currentSync = sendSyncs.get(transaction); - if (currentSync == null) { - currentSync = new Queue.SendSync(transaction); - transaction.addSynchronization(currentSync); - sendSyncs.put(transaction, currentSync); + @Override + public void afterRollback() throws Exception { + messageContext.message.decrementReferenceCount(); } - currentSync.add(context, message); } void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); ListenableFuture<Object> result = null; - boolean needsOrderingWithTransactions = context.isInTransaction(); producerExchange.incrementSend(); checkUsage(context, producerExchange, message); @@ -922,26 +844,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { throw e; } } - // did a transaction commit beat us to the index? - synchronized (orderIndexUpdates) { - needsOrderingWithTransactions |= !orderIndexUpdates.isEmpty(); - } - if (needsOrderingWithTransactions ) { - // If this is a transacted message.. increase the usage now so that - // a big TX does not blow up - // our memory. This increment is decremented once the tx finishes.. - message.incrementReferenceCount(); - - registerSendSync(message, context); - } else { - // Add to the pending list, this takes care of incrementing the - // usage manager. - sendMessage(message); - } + orderedCursorAdd(message, context); } finally { sendLock.unlock(); } - if (!needsOrderingWithTransactions) { + if (store == null || (!context.isInTransaction() && !message.isPersistent())) { messageSent(context, message); } if (result != null && message.isResponseRequired() && !result.isCancelled()) { @@ -954,6 +861,17 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } + private void orderedCursorAdd(Message message, ConnectionContext context) throws Exception { + if (context.isInTransaction()) { + context.getTransaction().addSynchronization(new CursorAddSync(new MessageContext(context, message, null))); + } else if (store != null && message.isPersistent()) { + doPendingCursorAdditions(); + } else { + // no ordering issue with non persistent messages + cursorAdd(message); + } + } + private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException { if (message.isPersistent()) { if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { @@ -1860,10 +1778,10 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - final void sendMessage(final Message msg) throws Exception { + final boolean cursorAdd(final Message msg) throws Exception { messagesLock.writeLock().lock(); try { - messages.addMessageLast(msg); + return messages.addMessageLast(msg); } finally { messagesLock.writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 88d31b8..fb7d69e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -748,6 +748,7 @@ public class RegionBroker extends EmptyBroker { if (deadLetterStrategy.isSendToDeadLetterQueue(message)) { // message may be inflight to other subscriptions so do not modify message = message.copy(); + message.getMessageId().setFutureOrSequenceLong(null); stampAsExpired(message); message.setExpiration(0); if (!message.isPersistent()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index e9c3c75..12ea104 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -82,12 +82,12 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs public void addMessageFirst(MessageReference node) throws Exception { } - public void addMessageLast(MessageReference node) throws Exception { + public boolean addMessageLast(MessageReference node) throws Exception { + return true; } public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { - addMessageLast(node); - return true; + return addMessageLast(node); } public void addRecoveredMessage(MessageReference node) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index b6f9b7e..fad666c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -18,12 +18,13 @@ package org.apache.activemq.broker.region.cursors; import java.util.Iterator; import java.util.LinkedList; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.TransactionId; import org.apache.activemq.store.MessageRecoveryListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +41,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i protected boolean batchResetNeeded = false; private boolean storeHasMessages = false; protected int size; - private MessageId lastCachedId; - private TransactionId lastTx; + private LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); + MessageId lastCachedId = null; protected boolean hadSpace = false; protected AbstractStoreCursor(Destination destination) { @@ -84,7 +85,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public final boolean recoverMessage(Message message) throws Exception { return recoverMessage(message,false); } - + public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { boolean recovered = false; if (recordUniqueId(message.getMessageId())) { @@ -100,13 +101,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i recovered = true; storeHasMessages = true; } else { - if (LOG.isDebugEnabled()) { - LOG.debug(this + " - cursor got duplicate: " + message.getMessageId() + "," + message.getPriority() + ", cached=" + cached, new Throwable("duplicate message detected")); - } else { - LOG.warn("{} - cursor got duplicate {}", regionDestination.getActiveMQDestination(), message.getMessageId()); - } - if (!cached || message.getMessageId().getEntryLocator() != null) { - // came from the store or was added to the jdbc store + LOG.warn("{} - cursor got duplicate {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); + + // a duplicate from the store - needs to be removed/acked - otherwise it will get redispatched on restart + // jdbc store will store duplicates and will set entry locator to sequence long. + // REVISIT - this seems too hacky - see use case AMQ4952Test + if (!cached || message.getMessageId().getEntryLocator() instanceof Long) { duplicate(message); } } @@ -189,21 +189,24 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return result; } - - public final synchronized void addMessageLast(MessageReference node) throws Exception { + public final synchronized boolean addMessageLast(MessageReference node) throws Exception { boolean disableCache = false; if (hasSpace()) { if (!isCacheEnabled() && size==0 && isStarted() && useCache) { - LOG.trace("{} - enabling cache for empty store {}", this, node.getMessageId()); + LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); setCacheEnabled(true); } if (isCacheEnabled()) { if (recoverMessage(node.getMessage(),true)) { - lastCachedId = node.getMessageId(); - lastTx = node.getMessage().getTransactionId(); + if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) { + pruneLastCached(); + pendingCachedIds.add(node.getMessageId()); + } else { + setLastCachedId(node.getMessageId()); + } } else { dealWithDuplicates(); - return; + return false; } } } else { @@ -213,16 +216,62 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i if (disableCache && isCacheEnabled()) { setCacheEnabled(false); // sync with store on disabling the cache - if (lastCachedId != null) { - LOG.debug("{} - disabling cache, lastCachedId: {} last-tx: {} current node Id: {} node-tx: {} batchList size: {}", - new Object[]{ this, lastCachedId, lastTx, node.getMessageId(), node.getMessage().getTransactionId(), batchList.size() }); - setBatch(lastCachedId); - lastCachedId = null; - lastTx = null; + if (!pendingCachedIds.isEmpty() || lastCachedId != null) { + LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}", + new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()}); + collapseLastCachedIds(); + if (lastCachedId != null) { + setBatch(lastCachedId); + lastCachedId = null; + } } } this.storeHasMessages = true; size++; + return true; + } + + + private void pruneLastCached() { + for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { + MessageId candidate = it.next(); + final Object futureOrLong = candidate.getFutureOrSequenceLong(); + if (futureOrLong instanceof Future) { + Future future = (Future) futureOrLong; + if (future.isCancelled()) { + it.remove(); + } + } else { + // store complete - track via lastCachedId + setLastCachedId(candidate); + it.remove(); + } + } + } + + private void collapseLastCachedIds() throws Exception { + for (MessageId candidate : pendingCachedIds) { + final Object futureOrLong = candidate.getFutureOrSequenceLong(); + if (futureOrLong instanceof Future) { + Future future = (Future) futureOrLong; + try { + future.get(); + // future should be replaced with sequence by this time + } catch (CancellationException ignored) { + continue; + } + } + setLastCachedId(candidate); + } + pendingCachedIds.clear(); + } + + private void setLastCachedId(MessageId candidate) { + if (lastCachedId == null) { + lastCachedId = candidate; + } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) { + lastCachedId = candidate; + } } protected void setBatch(MessageId messageId) throws Exception { @@ -260,8 +309,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public synchronized void gc() { - for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) { - MessageReference msg = i.next(); + for (MessageReference msg : batchList) { rollback(msg.getMessageId()); msg.decrementReferenceCount(); } @@ -272,7 +320,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } protected final synchronized void fillBatch() { - //LOG.trace("{} - fillBatch", this); if (batchResetNeeded) { resetSize(); setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size)); @@ -313,7 +360,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i public String toString() { return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() - + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace(); + + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId; } protected abstract void doFillBatch() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 2769e68..7512e39 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -203,8 +203,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple * @throws Exception */ @Override - public synchronized void addMessageLast(MessageReference node) throws Exception { - tryAddMessageLast(node, 0); + public synchronized boolean addMessageLast(MessageReference node) throws Exception { + return tryAddMessageLast(node, 0); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index 1fecb95..06d59f1 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -81,10 +81,12 @@ public interface PendingMessageCursor extends Service { * add message to await dispatch * * @param node + * @return boolean true if successful, false if cursor traps a duplicate * @throws IOException * @throws Exception */ - void addMessageLast(MessageReference node) throws Exception; + boolean addMessageLast(MessageReference node) throws Exception; + /** * add message to await dispatch - if it can * http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index c89b648..1f42d57 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -94,6 +94,7 @@ class QueueStorePrefetch extends AbstractStoreCursor { @Override protected void setBatch(MessageId messageId) throws Exception { + LOG.trace("{} setBatch {} loc: {}", this, messageId, messageId.getEntryLocator()); store.setBatch(messageId); batchResetNeeded = false; } http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 6d03eeb..1820e7b 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -183,7 +183,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } @Override - public synchronized void addMessageLast(MessageReference node) throws Exception { + public synchronized boolean addMessageLast(MessageReference node) throws Exception { if (node != null) { Message msg = node.getMessage(); if (isStarted()) { @@ -206,6 +206,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } } + return true; } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index b3f4261..5b072a6 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -87,7 +87,8 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { pendingCount = 0; } - public synchronized void addMessageLast(MessageReference node) throws Exception { + public synchronized boolean addMessageLast(MessageReference node) throws Exception { + boolean result = true; if (node != null) { Message msg = node.getMessage(); if (started) { @@ -97,9 +98,10 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { } } if (msg.isPersistent()) { - persistent.addMessageLast(node); + result = persistent.addMessageLast(node); } } + return result; } public synchronized void addMessageFirst(MessageReference node) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java index 9518981..15c61df 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java @@ -97,15 +97,15 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { * @param node */ - public synchronized void addMessageLast(MessageReference node) { + public synchronized boolean addMessageLast(MessageReference node) { node.incrementReferenceCount(); list.addMessageLast(node); + return true; } /** * add message to await dispatch * - * @param position * @param node */ http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index df8658f..43713e6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -29,6 +29,7 @@ abstract public class AbstractMessageStore implements MessageStore { public static final ListenableFuture<Object> FUTURE; protected final ActiveMQDestination destination; protected boolean prioritizedMessages; + protected IndexListener indexListener; public AbstractMessageStore(ActiveMQDestination destination) { this.destination = destination; @@ -114,10 +115,16 @@ abstract public class AbstractMessageStore implements MessageStore { removeMessage(context, ack); } + @Override public void updateMessage(Message message) throws IOException { throw new IOException("update is not supported by: " + this); } + @Override + public void registerIndexListener(IndexListener indexListener) { + this.indexListener = indexListener; + } + static { FUTURE = new InlineListenableFuture(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java new file mode 100644 index 0000000..66902dc --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.Message; + +/** + * callback when the index is updated, allows ordered work to be seen by destinations + */ +public interface IndexListener { + + final class MessageContext { + public Message message; + public ConnectionContext context; + public Runnable onCompletion; + public boolean duplicate; + + public MessageContext(ConnectionContext context, Message message, Runnable onCompletion) { + this.context = context; + this.message = message; + this.onCompletion = onCompletion; + } + } + + /** + * called with some global index lock held so that a listener can do order + * dependent work + * non null MessageContext.onCompletion called when work is done + */ + public void onAdd(MessageContext messageContext); + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java index 400245a..4cc472e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java @@ -195,4 +195,6 @@ public interface MessageStore extends Service { public boolean isPrioritizedMessages(); void updateMessage(Message message) throws IOException; + + void registerIndexListener(IndexListener indexListener); } http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index 3ddfadb..8c747e8 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -160,4 +160,9 @@ public class ProxyMessageStore implements MessageStore { public void updateMessage(Message message) throws IOException { delegate.updateMessage(message); } + + @Override + public void registerIndexListener(IndexListener indexListener) { + delegate.registerIndexListener(indexListener); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index de4d195..0f47f61 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -208,4 +208,9 @@ public class ProxyTopicMessageStore implements TopicMessageStore { public void updateMessage(Message message) throws IOException { delegate.updateMessage(message); } + + @Override + public void registerIndexListener(IndexListener indexListener) { + delegate.registerIndexListener(indexListener); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 836b388..7cdaa78 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -28,6 +28,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.AbstractMessageStore; @@ -41,6 +42,7 @@ public class MemoryMessageStore extends AbstractMessageStore { protected final Map<MessageId, Message> messageTable; protected MessageId lastBatchId; + protected long sequenceId; public MemoryMessageStore(ActiveMQDestination destination) { this(destination, new LinkedHashMap<MessageId, Message>()); @@ -56,6 +58,10 @@ public class MemoryMessageStore extends AbstractMessageStore { messageTable.put(message.getMessageId(), message); } message.incrementReferenceCount(); + message.getMessageId().setFutureOrSequenceLong(sequenceId++); + if (indexListener != null) { + indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); + } } // public void addMessageReference(ConnectionContext context,MessageId http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java b/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java index 5c3069f..de8cc12 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java @@ -37,6 +37,7 @@ public class MessageId implements DataStructure, Comparable<MessageId> { private transient AtomicReference<Object> dataLocator = new AtomicReference<Object>(); private transient Object entryLocator; private transient Object plistLocator; + private transient Object futureOrSequenceLong; public MessageId() { this.producerId = new ProducerId(); @@ -186,6 +187,7 @@ public class MessageId implements DataStructure, Comparable<MessageId> { copy.brokerSequenceId = brokerSequenceId; copy.dataLocator = dataLocator; copy.entryLocator = entryLocator; + copy.futureOrSequenceLong = futureOrSequenceLong; copy.plistLocator = plistLocator; copy.textView = textView; return copy; @@ -219,6 +221,14 @@ public class MessageId implements DataStructure, Comparable<MessageId> { this.dataLocator.set(value); } + public Object getFutureOrSequenceLong() { + return futureOrSequenceLong; + } + + public void setFutureOrSequenceLong(Object futureOrSequenceLong) { + this.futureOrSequenceLong = futureOrSequenceLong; + } + public Object getEntryLocator() { return entryLocator; } http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index c3d5594..9f53cc1 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -32,6 +32,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.AbstractMessageStore; +import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequenceData; @@ -101,7 +102,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } } - public void addMessage(ConnectionContext context, Message message) throws IOException { + public void addMessage(final ConnectionContext context, final Message message) throws IOException { MessageId messageId = message.getMessageId(); if (audit != null && audit.isDuplicate(message)) { if (LOG.isDebugEnabled()) { @@ -126,8 +127,26 @@ public class JDBCMessageStore extends AbstractMessageStore { long sequenceId; synchronized (pendingAdditions) { sequenceId = persistenceAdapter.getNextSequenceId(); - if (message.isInTransaction()) { - trackPendingSequence(c, sequenceId); + final long sequence = sequenceId; + pendingAdditions.add(sequence); + c.onCompletion(new Runnable() { + public void run() { + // message added to db + message.getMessageId().setFutureOrSequenceLong(sequence); + message.getMessageId().setEntryLocator(sequence); + } + }); + + if (indexListener != null) { + indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { + @Override + public void run() { + // cursor add complete + synchronized (pendingAdditions) { pendingAdditions.remove(sequence); } + } + })); + } else { + pendingAdditions.remove(sequence); } } try { @@ -139,20 +158,10 @@ public class JDBCMessageStore extends AbstractMessageStore { } finally { c.close(); } - message.getMessageId().setEntryLocator(sequenceId); onAdd(message, sequenceId, message.getPriority()); } // jdbc commit order is random with concurrent connections - limit scan to lowest pending - private void trackPendingSequence(final TransactionContext transactionContext, final long sequenceId) { - synchronized (pendingAdditions) { pendingAdditions.add(sequenceId); } - transactionContext.onCompletion(new Runnable() { - public void run() { - synchronized (pendingAdditions) { pendingAdditions.remove(sequenceId); } - } - }); - } - private long minPendingSequeunceId() { synchronized (pendingAdditions) { if (!pendingAdditions.isEmpty()) { @@ -237,8 +246,8 @@ public class JDBCMessageStore extends AbstractMessageStore { public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { - long seq = ack.getLastMessageId().getEntryLocator() != null ? - (Long) ack.getLastMessageId().getEntryLocator() : + long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ? + (Long) ack.getLastMessageId().getFutureOrSequenceLong() : persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0]; // Get a connection and remove the message from the DB @@ -251,9 +260,9 @@ public class JDBCMessageStore extends AbstractMessageStore { } finally { c.close(); } - if (context != null && context.getXid() != null) { - ack.getLastMessageId().setEntryLocator(seq); - } + //if (context != null && context.getXid() != null) { + // ack.getLastMessageId().setEntryLocator(seq); + //} } public void recover(final MessageRecoveryListener listener) throws Exception { @@ -341,7 +350,7 @@ public class JDBCMessageStore extends AbstractMessageStore { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); - msg.getMessageId().setEntryLocator(sequenceId); + msg.getMessageId().setFutureOrSequenceLong(sequenceId); listener.recoverMessage(msg); lastRecoveredSequenceId.set(sequenceId); lastRecoveredPriority.set(msg.getPriority()); http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index b4fb5d5..0a6dde8 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -773,7 +773,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException { TransactionContext c = getTransactionContext(context); try { - long sequence = (Long)messageId.getEntryLocator(); + long sequence = (Long)messageId.getFutureOrSequenceLong(); getAdapter().doCommitAddOp(c, sequence); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); @@ -786,7 +786,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException { TransactionContext c = getTransactionContext(context); try { - getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getEntryLocator(), null); + getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getFutureOrSequenceLong(), null); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e); http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index 4128eef..b2fedf7 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -108,7 +108,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { jdbcPersistenceAdapter.commitAdd(context, message.getMessageId()); ((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd( message, - (Long)message.getMessageId().getEntryLocator(), + (Long)message.getMessageId().getFutureOrSequenceLong(), message.getPriority()); } @@ -170,7 +170,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { public void recoverAdd(long id, byte[] messageBytes) throws IOException { final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes)); - message.getMessageId().setEntryLocator(id); + message.getMessageId().setFutureOrSequenceLong(id); Tx tx = getPreparedTx(message.getTransactionId()); tx.add(new AddMessageCommand() { MessageStore messageStore; @@ -187,7 +187,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { @Override public void run(ConnectionContext context) throws IOException { ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId()); - ((JDBCMessageStore)messageStore).onAdd(message, ((Long)message.getMessageId().getEntryLocator()).longValue(), message.getPriority()); + ((JDBCMessageStore)messageStore).onAdd(message, ((Long)message.getMessageId().getFutureOrSequenceLong()).longValue(), message.getPriority()); } @Override @@ -200,7 +200,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { public void recoverAck(long id, byte[] xid, byte[] message) throws IOException { Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message)); - msg.getMessageId().setEntryLocator(id); + msg.getMessageId().setFutureOrSequenceLong(id); Tx tx = getPreparedTx(new XATransactionId(xid)); final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1); tx.add(new RemoveMessageCommand() { http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 0087ac9..970e0f8 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -396,7 +396,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { if (this.batchStatements) { s.addBatch(); } else if (s.executeUpdate() != 1) { - throw new SQLException("Failed to remove message"); + throw new SQLException("Failed to remove message seq: " + seq); } } finally { cleanupExclusiveLock.readLock().unlock(); @@ -935,7 +935,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { this.batchStatements = batchStatements; // The next lines are deprecated and should be removed in a future release // and is here in case someone created their own - this.batchStatments = batchStatements; + // this.batchStatments = batchStatements; } // Note - remove batchStatment in future distributions. Here for backward compatibility @@ -1168,8 +1168,12 @@ public class DefaultJDBCAdapter implements JDBCAdapter { printQuery(s,System.out); } public static void dumpTables(java.sql.Connection c) throws SQLException { - printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out); - printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); + printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out); + + //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out); + + //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out); + //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); } public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out) http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 975cd05..54cfd7d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; @@ -57,6 +59,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; import org.apache.activemq.store.AbstractMessageStore; +import org.apache.activemq.store.IndexListener; import org.apache.activemq.store.ListenableFuture; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; @@ -83,7 +86,7 @@ import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { +public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, MessageDatabase.SerialExecution<Location> { static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class); private static final int MAX_ASYNC_JOBS = 10000; @@ -121,6 +124,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return txid; } }; + serialExecutor = this; } @Override @@ -207,7 +211,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { // In case the recovered store used a different OpenWire version log a warning // to assist in determining why journal reads fail. if (metadata.openwireVersion != brokerService.getStoreOpenWireVersion()) { - LOG.warn("Receovered Store uses a different OpenWire version[{}] " + + LOG.warn("Recovered Store uses a different OpenWire version[{}] " + "than the version configured[{}].", metadata.openwireVersion, brokerService.getStoreOpenWireVersion()); } @@ -286,21 +290,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { super.doStop(stopper); } - @Override - void rollbackStatsOnDuplicate(KahaDestination commandDestination) { - if (brokerService != null) { - RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); - if (regionBroker != null) { - ActiveMQDestination activeMQDestination = convert(commandDestination); - Destination destination = regionBroker.getDestinationMap(activeMQDestination).get(activeMQDestination); - if (destination != null) { - destination.getDestinationStatistics().getMessages().decrement(); - destination.getDestinationStatistics().getEnqueues().decrement(); - } - } - } - } - private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { @Override @@ -358,6 +347,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { this.forceRecoverIndex = forceRecoverIndex; } + @Override + public Location execute(Callable<Location> c) throws Exception { + if (isConcurrentStoreAndDispatchQueues()) { + FutureTask<Location> future = new FutureTask<>(c); + this.queueExecutor.execute(future); + return future.get(); + } else { + return c.call(); + } + } + public class KahaDBMessageStore extends AbstractMessageStore { protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>(); protected KahaDestination dest; @@ -385,7 +385,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { StoreQueueTask result = new StoreQueueTask(this, context, message); result.aquireLocks(); addQueueTask(this, result); - return result.getFuture(); + final ListenableFuture<Object> future = result.getFuture(); + if (indexListener != null) { + // allow concurrent dispatch by setting entry locator, + // wait for add completion to remove potential pending addition + message.getMessageId().setFutureOrSequenceLong(future); + indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { + @Override + public void run() { + try { + future.get(); + trackPendingAddComplete(dest, (Long) message.getMessageId().getFutureOrSequenceLong()); + } catch (CancellationException okNothingToTrack) { + } catch (Exception e) { + LOG.warn("{} unexpected exception tracking completion of async add of {}", this, message.getMessageId(), e); + } + } + })); + } + return future; } else { return super.asyncAddQueueMessage(context, message); } @@ -423,7 +441,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } @Override - public void addMessage(ConnectionContext context, Message message) throws IOException { + public void addMessage(final ConnectionContext context, final Message message) throws IOException { KahaAddMessageCommand command = new KahaAddMessageCommand(); command.setDestination(dest); command.setMessageId(message.getMessageId().toProducerKey()); @@ -432,8 +450,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { command.setPrioritySupported(isPrioritizedMessages()); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null); - + store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { + @Override + public void sequenceAssignedWithIndexLocked(final long sequence) { + final Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); + message.getMessageId().setFutureOrSequenceLong(sequence); + if (indexListener != null) { + trackPendingAdd(dest, sequence); + if (possibleFuture == null) { + // sync add (for async future present from getFutureOrSequenceLong) + indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { + @Override + public void run() { + trackPendingAddComplete(dest, sequence); + } + })); + } + } + } + }, null); } @Override @@ -582,6 +617,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { continue; } Message msg = loadMessage(entry.getValue().location); + msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); listener.recoverMessage(msg); counter++; if (counter >= maxReturned) { @@ -643,7 +679,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } @Override - public void setBatch(MessageId identity) throws IOException { + public void setBatch(final MessageId identity) throws IOException { try { final String key = identity.toProducerKey(); lockAsyncJobQueue(); @@ -660,6 +696,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { Long location = sd.messageIdIndex.get(tx, key); if (location != null) { sd.orderIndex.setBatch(tx, location); + } else { + LOG.warn("{} Location {} not found in order index for {}", this, identity.getFutureOrSequenceLong(), identity); } } }); http://git-wip-us.apache.org/repos/asf/activemq/blob/54e2e3be/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java index 47a9c34..7a79ddd 100755 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java @@ -42,9 +42,7 @@ import org.apache.activemq.store.ProxyTopicMessageStore; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation; import org.apache.activemq.store.kahadb.MessageDatabase.Operation; -import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation; import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; @@ -254,7 +252,7 @@ public class KahaDBTransactionStore implements TransactionStore { return tx; } - public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) + public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit) throws IOException { if (txid != null) { if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { @@ -294,7 +292,10 @@ public class KahaDBTransactionStore implements TransactionStore { } else { KahaTransactionInfo info = getTransactionInfo(txid); - theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), preCommit, postCommit); + if (preCommit != null) { + preCommit.run(); + } + theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit); forgetRecoveredAcks(txid, false); } }else { @@ -336,13 +337,13 @@ public class KahaDBTransactionStore implements TransactionStore { ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); for (Operation op : entry.getValue()) { - if (op.getClass() == AddOpperation.class) { - AddOpperation addOp = (AddOpperation) op; + if (op.getClass() == MessageDatabase.AddOperation.class) { + MessageDatabase.AddOperation addOp = (MessageDatabase.AddOperation) op; Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage() .newInput())); messageList.add(msg); } else { - RemoveOpperation rmOp = (RemoveOpperation) op; + MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op; Buffer ackb = rmOp.getCommand().getAck(); MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput())); ackList.add(ack);