Repository: activemq
Updated Branches:
  refs/heads/master 90726a60a -> 499e39e52


https://issues.apache.org/jira/browse/AMQ-6164 - allow journal write batching 
on a single destination


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/499e39e5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/499e39e5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/499e39e5

Branch: refs/heads/master
Commit: 499e39e52c392fcc6d897a526afbbd3b144121e2
Parents: 90726a6
Author: gtully <[email protected]>
Authored: Tue Feb 9 12:49:53 2016 +0000
Committer: gtully <[email protected]>
Committed: Tue Feb 9 12:50:27 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 45 +++++++++-----------
 .../store/memory/MemoryMessageStore.java        | 10 ++---
 .../activemq/store/kahadb/KahaDBStore.java      | 13 +++---
 .../apache/activemq/leveldb/LevelDBStore.scala  |  5 +--
 4 files changed, 34 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/499e39e5/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 960ac9c..34817a0 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -829,33 +829,28 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         producerExchange.incrementSend();
         do {
             checkUsage(context, producerExchange, message);
-            sendLock.lockInterruptibly();
-            try {
-                
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
-                if (store != null && message.isPersistent()) {
-                    message.getMessageId().setFutureOrSequenceLong(null);
-                    try {
-                        if (messages.isCacheEnabled()) {
-                            result = store.asyncAddQueueMessage(context, 
message, isOptimizeStorage());
-                            result.addListener(new 
PendingMarshalUsageTracker(message));
-                        } else {
-                            store.addMessage(context, message);
-                        }
-                        if (isReduceMemoryFootprint()) {
-                            message.clearMarshalledState();
-                        }
-                    } catch (Exception e) {
-                        // we may have a store in inconsistent state, so reset 
the cursor
-                        // before restarting normal broker operations
-                        resetNeeded = true;
-                        throw e;
+            
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
+            if (store != null && message.isPersistent()) {
+                message.getMessageId().setFutureOrSequenceLong(null);
+                try {
+                    if (messages.isCacheEnabled()) {
+                        result = store.asyncAddQueueMessage(context, message, 
isOptimizeStorage());
+                        result.addListener(new 
PendingMarshalUsageTracker(message));
+                    } else {
+                        store.addMessage(context, message);
                     }
+                    if (isReduceMemoryFootprint()) {
+                        message.clearMarshalledState();
+                    }
+                } catch (Exception e) {
+                    // we may have a store in inconsistent state, so reset the 
cursor
+                    // before restarting normal broker operations
+                    resetNeeded = true;
+                    throw e;
                 }
-                if(tryOrderedCursorAdd(message, context)) {
-                    break;
-                }
-            } finally {
-                sendLock.unlock();
+            }
+            if(tryOrderedCursorAdd(message, context)) {
+                break;
             }
         } while (started.get());
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/499e39e5/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index b32a811..736d912 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -59,11 +59,11 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
         synchronized (messageTable) {
             messageTable.put(message.getMessageId(), message);
             incMessageStoreStatistics(getMessageStoreStatistics(), message);
-        }
-        message.incrementReferenceCount();
-        message.getMessageId().setFutureOrSequenceLong(sequenceId++);
-        if (indexListener != null) {
-            indexListener.onAdd(new IndexListener.MessageContext(context, 
message, null));
+            message.incrementReferenceCount();
+            message.getMessageId().setFutureOrSequenceLong(sequenceId++);
+            if (indexListener != null) {
+                indexListener.onAdd(new IndexListener.MessageContext(context, 
message, null));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/499e39e5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index fa4672b..e1c1df4 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -326,10 +326,9 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter {
         return task;
     }
 
+    // with asyncTaskMap locked
     protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) 
throws IOException {
-        synchronized (store.asyncTaskMap) {
-            store.asyncTaskMap.put(new 
AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
-        }
+        store.asyncTaskMap.put(new 
AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
         this.queueExecutor.execute(task);
     }
 
@@ -390,9 +389,11 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter {
                 message.getMessageId().setFutureOrSequenceLong(future);
                 message.setRecievedByDFBridge(true); // flag message as 
concurrentStoreAndDispatch
                 result.aquireLocks();
-                addQueueTask(this, result);
-                if (indexListener != null) {
-                    indexListener.onAdd(new 
IndexListener.MessageContext(context, message, null));
+                synchronized (asyncTaskMap) {
+                    addQueueTask(this, result);
+                    if (indexListener != null) {
+                        indexListener.onAdd(new 
IndexListener.MessageContext(context, message, null));
+                    }
                 }
                 return future;
             } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/499e39e5/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index a4cdcac..f80e722 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -758,7 +758,7 @@ class LevelDBStore extends LockableServiceSupport with 
BrokerServiceAware with P
       uow.addCompleteListener({
         message.decrementReferenceCount()
       })
-      val sequence = lastSeq.synchronized {
+      lastSeq.synchronized {
         val seq = lastSeq.incrementAndGet()
         message.getMessageId.setFutureOrSequenceLong(seq);
         // null context on xa recovery, we want to bypass the cursor & pending 
adds as it will be reset
@@ -768,9 +768,8 @@ class LevelDBStore extends LockableServiceSupport with 
BrokerServiceAware with P
             def run(): Unit = pendingCursorAdds.synchronized { 
pendingCursorAdds.remove(seq) }
           }))
         }
-        seq
+        uow.enqueue(key, seq, message, delay)
       }
-      uow.enqueue(key, sequence, message, delay)
     }
 
     override def asyncAddQueueMessage(context: ConnectionContext, message: 
Message) = asyncAddQueueMessage(context, message, false)

Reply via email to