AMQ4677Test.testSendAndReceiveAllMessages - demoed the lack of reference increment for transacted send and the non completion of transacted futures in leveldb
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8a37f973 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8a37f973 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8a37f973 Branch: refs/heads/trunk Commit: 8a37f97315425d51744ea5fd139f8ab91469d2ca Parents: 862f503 Author: gtully <gary.tu...@gmail.com> Authored: Sat Aug 30 23:36:05 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Sat Aug 30 23:36:05 2014 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/activemq/broker/region/Queue.java | 1 + .../main/java/org/apache/activemq/store/IndexListener.java | 6 +++--- .../main/scala/org/apache/activemq/leveldb/DBManager.scala | 9 +++++++-- .../scala/org/apache/activemq/leveldb/LevelDBStore.scala | 1 - 4 files changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8a37f973/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index d1605e2..e9f2180 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -797,6 +797,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index CursorAddSync(MessageContext messageContext) { this.messageContext = messageContext; + this.messageContext.message.incrementReferenceCount(); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/8a37f973/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java index 66902dc..2c91e91 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java @@ -25,9 +25,9 @@ import org.apache.activemq.command.Message; public interface IndexListener { final class MessageContext { - public Message message; - public ConnectionContext context; - public Runnable onCompletion; + public final Message message; + public final ConnectionContext context; + public final Runnable onCompletion; public boolean duplicate; public MessageContext(ConnectionContext context, Message message, Runnable onCompletion) { http://git-wip-us.apache.org/repos/asf/activemq/blob/8a37f973/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 34bcc6a..d40d947 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -330,8 +330,13 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { val entry = QueueEntryRecord(id, queueKey, queueSeq) assert(id.getEntryLocator == null) id.setEntryLocator(EntryLocator(queueKey, queueSeq)) - id.setFutureOrSequenceLong(countDownFuture) - countDownFuture.id = id + if (message.getTransactionId!=null) { + // why does future not get set in tx? + id.setFutureOrSequenceLong(queueSeq) + } else { + id.setFutureOrSequenceLong(countDownFuture) + countDownFuture.id = id + } val a = this.synchronized { if( !delay ) http://git-wip-us.apache.org/repos/asf/activemq/blob/8a37f973/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 6e3faff..451bc04 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -684,7 +684,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P messageContext.message.decrementReferenceCount() }) val future = uow.enqueue(key, seq, messageContext.message, delay) - messageContext.message.getMessageId.setFutureOrSequenceLong(future) if (indexListener != null) { indexListener.onAdd(messageContext) }