Repository: activemq
Updated Branches:
  refs/heads/master 6a0c65828 -> 0f0bdb21e


https://issues.apache.org/jira/browse/AMQ-6524

Fixing a thread safety issue with memoryUsage when using 
concurrentStoreAndDispatch
that was causing memory usage to get out of sync.

The InnerFutureTask class inside KahaDB was not thread safe which was
the root cause of the problem.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0f0bdb21
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0f0bdb21
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0f0bdb21

Branch: refs/heads/master
Commit: 0f0bdb21ef97c39ec7d54d164e07921611c1de08
Parents: 6a0c658
Author: Christopher L. Shannon (cshannon) <[email protected]>
Authored: Fri Dec 2 12:19:53 2016 -0500
Committer: Christopher L. Shannon (cshannon) <[email protected]>
Committed: Mon Dec 5 07:10:20 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/activemq/store/kahadb/KahaDBStore.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0f0bdb21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index b4f6c6f..758d0de 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.BaseDestination;
@@ -1435,10 +1436,10 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
 
         private class InnerFutureTask extends FutureTask<Object> implements 
ListenableFuture<Object>  {
 
-            private Runnable listener;
+            private final AtomicReference<Runnable> listenerRef = new 
AtomicReference<>();
+
             public InnerFutureTask(Runnable runnable) {
                 super(runnable, null);
-
             }
 
             public void setException(final Exception e) {
@@ -1456,13 +1457,14 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
 
             @Override
             public void addListener(Runnable listener) {
-                this.listener = listener;
+                this.listenerRef.set(listener);
                 if (isDone()) {
                     fireListener();
                 }
             }
 
             private void fireListener() {
+                Runnable listener = listenerRef.getAndSet(null);
                 if (listener != null) {
                     try {
                         listener.run();

Reply via email to