This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new 6d91d71c1 AMQ-9202 - Make sure Reentrant locks are acquired outside a 
try block
6d91d71c1 is described below

commit 6d91d71c1acab166a607425e88a228ba28d06bab
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Wed Feb 1 11:17:31 2023 -0500

    AMQ-9202 - Make sure Reentrant locks are acquired outside a try block
    
    This is best practice and will prevent unlock from being attempted
    inside of a finally block when the thread doesn't actually own the
    lock which can happen when the lock attempt throws an exception
    such as calling lockInterruptibly()
    
    (cherry picked from commit ed924cddac90b96bdc47b215852a68155d818bcd)
---
 .../org/apache/activemq/broker/region/Queue.java   |  2 +-
 .../activemq/store/kahadb/MessageDatabase.java     | 74 +++++++++++-----------
 .../plugin/AbstractRuntimeConfigurationBroker.java | 10 +--
 3 files changed, 42 insertions(+), 44 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 4bc669046..79f897ec1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1299,8 +1299,8 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
     public void purge() throws Exception {
         ConnectionContext c = createConnectionContext();
         List<MessageReference> list = null;
+        sendLock.lock();
         try {
-            sendLock.lock();
             long originalMessageCount = 
this.destinationStatistics.getMessages().getCount();
             do {
                 doPageIn(true, false, getMaxPageSize());  // signal no expiry 
processing needed.
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 55137db18..6687c56b4 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2070,28 +2070,25 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
             //flag to know whether the ack forwarding completed without an 
exception
             boolean forwarded = false;
 
+            //acquire the checkpoint lock to prevent other threads from
+            //running a checkpoint while this is running
+            //
+            //Normally this task runs on the same executor as the checkpoint 
task
+            //so this ack compaction runner wouldn't run at the same time as 
the checkpoint task.
+            //
+            //However, there are two cases where this isn't always true.
+            //First, the checkpoint() method is public and can be called 
through the
+            //PersistenceAdapter interface by someone at the same time this is 
running.
+            //Second, a checkpoint is called during shutdown without using the 
executor.
+            //
+            //In the future it might be better to just remove the 
checkpointLock entirely
+            //and only use the executor but this would need to be examined for 
any unintended
+            //consequences
+            checkpointLock.readLock().lock();
             try {
-                //acquire the checkpoint lock to prevent other threads from
-                //running a checkpoint while this is running
-                //
-                //Normally this task runs on the same executor as the 
checkpoint task
-                //so this ack compaction runner wouldn't run at the same time 
as the checkpoint task.
-                //
-                //However, there are two cases where this isn't always true.
-                //First, the checkpoint() method is public and can be called 
through the
-                //PersistenceAdapter interface by someone at the same time 
this is running.
-                //Second, a checkpoint is called during shutdown without using 
the executor.
-                //
-                //In the future it might be better to just remove the 
checkpointLock entirely
-                //and only use the executor but this would need to be examined 
for any unintended
-                //consequences
-                checkpointLock.readLock().lock();
-
+                // Lock index to capture the ackMessageFileMap data
+                indexLock.writeLock().lock();
                 try {
-
-                    // Lock index to capture the ackMessageFileMap data
-                    indexLock.writeLock().lock();
-
                     // Map keys might not be sorted, find the earliest log 
file to forward acks
                     // from and move only those, future cycles can chip away 
at more as needed.
                     // We won't move files that are themselves rewritten on a 
previous compaction.
@@ -2210,26 +2207,27 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
 
         // Lock index while we update the ackMessageFileMap.
         indexLock.writeLock().lock();
-
-        // Update the ack map with the new locations of the acks
-        for (Entry<Integer, Set<Integer>> entry : 
updatedAckLocations.entrySet()) {
-            Set<Integer> referenceFileIds = 
metadata.ackMessageFileMap.get(entry.getKey());
-            if (referenceFileIds == null) {
-                referenceFileIds = new HashSet<>();
-                referenceFileIds.addAll(entry.getValue());
-                metadata.ackMessageFileMap.put(entry.getKey(), 
referenceFileIds);
-                metadata.ackMessageFileMapDirtyFlag.lazySet(true);
-            } else {
-                referenceFileIds.addAll(entry.getValue());
+        try {
+            // Update the ack map with the new locations of the acks
+            for (Entry<Integer, Set<Integer>> entry : 
updatedAckLocations.entrySet()) {
+                Set<Integer> referenceFileIds = 
metadata.ackMessageFileMap.get(entry.getKey());
+                if (referenceFileIds == null) {
+                    referenceFileIds = new HashSet<>();
+                    referenceFileIds.addAll(entry.getValue());
+                    metadata.ackMessageFileMap.put(entry.getKey(), 
referenceFileIds);
+                    metadata.ackMessageFileMapDirtyFlag.lazySet(true);
+                } else {
+                    referenceFileIds.addAll(entry.getValue());
+                }
             }
-        }
-
-        // remove the old location data from the ack map so that the old 
journal log file can
-        // be removed on next GC.
-        metadata.ackMessageFileMap.remove(journalToRead);
-        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
 
-        indexLock.writeLock().unlock();
+            // remove the old location data from the ack map so that the old 
journal log file can
+            // be removed on next GC.
+            metadata.ackMessageFileMap.remove(journalToRead);
+            metadata.ackMessageFileMapDirtyFlag.lazySet(true);
+        } finally {
+            indexLock.writeLock().unlock();
+        }
 
         LOG.trace("ACK File Map following updates: {}", 
metadata.ackMessageFileMap);
     }
diff --git 
a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java
 
b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java
index c672579b3..c5c5ca367 100644
--- 
a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java
+++ 
b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java
@@ -77,8 +77,8 @@ public class AbstractRuntimeConfigurationBroker extends 
BrokerFilter {
     public Destination addDestination(ConnectionContext context, 
ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
         Runnable work = addDestinationWork.poll();
         if (work != null) {
+            addDestinationBarrier.writeLock().lockInterruptibly();
             try {
-                addDestinationBarrier.writeLock().lockInterruptibly();
                 do {
                     work.run();
                     work = addDestinationWork.poll();
@@ -88,8 +88,8 @@ public class AbstractRuntimeConfigurationBroker extends 
BrokerFilter {
                 addDestinationBarrier.writeLock().unlock();
             }
         } else {
+            addDestinationBarrier.readLock().lockInterruptibly();
             try {
-                addDestinationBarrier.readLock().lockInterruptibly();
                 return super.addDestination(context, destination, 
createIfTemporary);
             } finally {
                 addDestinationBarrier.readLock().unlock();
@@ -102,8 +102,8 @@ public class AbstractRuntimeConfigurationBroker extends 
BrokerFilter {
     public void addConnection(ConnectionContext context, ConnectionInfo info) 
throws Exception {
         Runnable work = addConnectionWork.poll();
         if (work != null) {
+            addConnectionBarrier.writeLock().lockInterruptibly();
             try {
-                addConnectionBarrier.writeLock().lockInterruptibly();
                 do {
                     work.run();
                     work = addConnectionWork.poll();
@@ -113,8 +113,8 @@ public class AbstractRuntimeConfigurationBroker extends 
BrokerFilter {
                 addConnectionBarrier.writeLock().unlock();
             }
         } else {
+            addConnectionBarrier.readLock().lockInterruptibly();
             try {
-                addConnectionBarrier.readLock().lockInterruptibly();
                 super.addConnection(context, info);
             } finally {
                 addConnectionBarrier.readLock().unlock();
@@ -131,8 +131,8 @@ public class AbstractRuntimeConfigurationBroker extends 
BrokerFilter {
     protected void applyDestinationWork() throws Exception {
         Runnable work = addDestinationWork.poll();
         if (work != null) {
+            addDestinationBarrier.writeLock().lockInterruptibly();
             try {
-                addDestinationBarrier.writeLock().lockInterruptibly();
                 do {
                     work.run();
                     work = addDestinationWork.poll();

Reply via email to