Repository: activemq Updated Branches: refs/heads/master 6a0c65828 -> 0f0bdb21e
https://issues.apache.org/jira/browse/AMQ-6524 Fixing a thread safety issue with memoryUsage when using concurrentStoreAndDispatch that was causing memory usage to get out of sync. The InnerFutureTask class inside KahaDB was not thread safe which was the root cause of the problem. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0f0bdb21 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0f0bdb21 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0f0bdb21 Branch: refs/heads/master Commit: 0f0bdb21ef97c39ec7d54d164e07921611c1de08 Parents: 6a0c658 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Fri Dec 2 12:19:53 2016 -0500 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Mon Dec 5 07:10:20 2016 -0500 ---------------------------------------------------------------------- .../java/org/apache/activemq/store/kahadb/KahaDBStore.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0f0bdb21/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index b4f6c6f..758d0de 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.BaseDestination; @@ -1435,10 +1436,10 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, private class InnerFutureTask extends FutureTask<Object> implements ListenableFuture<Object> { - private Runnable listener; + private final AtomicReference<Runnable> listenerRef = new AtomicReference<>(); + public InnerFutureTask(Runnable runnable) { super(runnable, null); - } public void setException(final Exception e) { @@ -1456,13 +1457,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, @Override public void addListener(Runnable listener) { - this.listener = listener; + this.listenerRef.set(listener); if (isDone()) { fireListener(); } } private void fireListener() { + Runnable listener = listenerRef.getAndSet(null); if (listener != null) { try { listener.run();
