AMQ4677Test.testSendAndReceiveAllMessages - demoed the lack of reference 
increment for transacted send and the non completion of transacted futures in 
leveldb


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8a37f973
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8a37f973
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8a37f973

Branch: refs/heads/trunk
Commit: 8a37f97315425d51744ea5fd139f8ab91469d2ca
Parents: 862f503
Author: gtully <gary.tu...@gmail.com>
Authored: Sat Aug 30 23:36:05 2014 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Sat Aug 30 23:36:05 2014 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/activemq/broker/region/Queue.java  | 1 +
 .../main/java/org/apache/activemq/store/IndexListener.java  | 6 +++---
 .../main/scala/org/apache/activemq/leveldb/DBManager.scala  | 9 +++++++--
 .../scala/org/apache/activemq/leveldb/LevelDBStore.scala    | 1 -
 4 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8a37f973/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index d1605e2..e9f2180 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -797,6 +797,7 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
 
         CursorAddSync(MessageContext messageContext) {
             this.messageContext = messageContext;
+            this.messageContext.message.incrementReferenceCount();
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/8a37f973/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java 
b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java
index 66902dc..2c91e91 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/IndexListener.java
@@ -25,9 +25,9 @@ import org.apache.activemq.command.Message;
 public interface IndexListener {
 
     final class MessageContext {
-        public Message message;
-        public ConnectionContext context;
-        public Runnable onCompletion;
+        public final Message message;
+        public final ConnectionContext context;
+        public final Runnable onCompletion;
         public boolean duplicate;
 
         public MessageContext(ConnectionContext context, Message message, 
Runnable onCompletion) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8a37f973/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 34bcc6a..d40d947 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -330,8 +330,13 @@ class DelayableUOW(val manager:DBManager) extends 
BaseRetained {
     val entry = QueueEntryRecord(id, queueKey, queueSeq)
     assert(id.getEntryLocator == null)
     id.setEntryLocator(EntryLocator(queueKey, queueSeq))
-    id.setFutureOrSequenceLong(countDownFuture)
-    countDownFuture.id = id
+    if (message.getTransactionId!=null) {
+        // why does future not get set in tx?
+       id.setFutureOrSequenceLong(queueSeq)
+    } else {
+      id.setFutureOrSequenceLong(countDownFuture)
+      countDownFuture.id = id
+    }
 
     val a = this.synchronized {
       if( !delay )

http://git-wip-us.apache.org/repos/asf/activemq/blob/8a37f973/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 6e3faff..451bc04 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -684,7 +684,6 @@ class LevelDBStore extends LockableServiceSupport with 
BrokerServiceAware with P
         messageContext.message.decrementReferenceCount()
       })
       val future = uow.enqueue(key, seq, messageContext.message, delay)
-      messageContext.message.getMessageId.setFutureOrSequenceLong(future)
       if (indexListener != null) {
         indexListener.onAdd(messageContext)
       }

Reply via email to