This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
new 6d91d71c1 AMQ-9202 - Make sure Reentrant locks are acquired outside a
try block
6d91d71c1 is described below
commit 6d91d71c1acab166a607425e88a228ba28d06bab
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Wed Feb 1 11:17:31 2023 -0500
AMQ-9202 - Make sure Reentrant locks are acquired outside a try block
This is best practice and will prevent unlock from being attempted
inside of a finally block when the thread doesn't actually own the
lock which can happen when the lock attempt throws an exception
such as calling lockInterruptibly()
(cherry picked from commit ed924cddac90b96bdc47b215852a68155d818bcd)
---
.../org/apache/activemq/broker/region/Queue.java | 2 +-
.../activemq/store/kahadb/MessageDatabase.java | 74 +++++++++++-----------
.../plugin/AbstractRuntimeConfigurationBroker.java | 10 +--
3 files changed, 42 insertions(+), 44 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 4bc669046..79f897ec1 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1299,8 +1299,8 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
public void purge() throws Exception {
ConnectionContext c = createConnectionContext();
List<MessageReference> list = null;
+ sendLock.lock();
try {
- sendLock.lock();
long originalMessageCount =
this.destinationStatistics.getMessages().getCount();
do {
doPageIn(true, false, getMaxPageSize()); // signal no expiry
processing needed.
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 55137db18..6687c56b4 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2070,28 +2070,25 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
//flag to know whether the ack forwarding completed without an
exception
boolean forwarded = false;
+ //acquire the checkpoint lock to prevent other threads from
+ //running a checkpoint while this is running
+ //
+ //Normally this task runs on the same executor as the checkpoint
task
+ //so this ack compaction runner wouldn't run at the same time as
the checkpoint task.
+ //
+ //However, there are two cases where this isn't always true.
+ //First, the checkpoint() method is public and can be called
through the
+ //PersistenceAdapter interface by someone at the same time this is
running.
+ //Second, a checkpoint is called during shutdown without using the
executor.
+ //
+ //In the future it might be better to just remove the
checkpointLock entirely
+ //and only use the executor but this would need to be examined for
any unintended
+ //consequences
+ checkpointLock.readLock().lock();
try {
- //acquire the checkpoint lock to prevent other threads from
- //running a checkpoint while this is running
- //
- //Normally this task runs on the same executor as the
checkpoint task
- //so this ack compaction runner wouldn't run at the same time
as the checkpoint task.
- //
- //However, there are two cases where this isn't always true.
- //First, the checkpoint() method is public and can be called
through the
- //PersistenceAdapter interface by someone at the same time
this is running.
- //Second, a checkpoint is called during shutdown without using
the executor.
- //
- //In the future it might be better to just remove the
checkpointLock entirely
- //and only use the executor but this would need to be examined
for any unintended
- //consequences
- checkpointLock.readLock().lock();
-
+ // Lock index to capture the ackMessageFileMap data
+ indexLock.writeLock().lock();
try {
-
- // Lock index to capture the ackMessageFileMap data
- indexLock.writeLock().lock();
-
// Map keys might not be sorted, find the earliest log
file to forward acks
// from and move only those, future cycles can chip away
at more as needed.
// We won't move files that are themselves rewritten on a
previous compaction.
@@ -2210,26 +2207,27 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
// Lock index while we update the ackMessageFileMap.
indexLock.writeLock().lock();
-
- // Update the ack map with the new locations of the acks
- for (Entry<Integer, Set<Integer>> entry :
updatedAckLocations.entrySet()) {
- Set<Integer> referenceFileIds =
metadata.ackMessageFileMap.get(entry.getKey());
- if (referenceFileIds == null) {
- referenceFileIds = new HashSet<>();
- referenceFileIds.addAll(entry.getValue());
- metadata.ackMessageFileMap.put(entry.getKey(),
referenceFileIds);
- metadata.ackMessageFileMapDirtyFlag.lazySet(true);
- } else {
- referenceFileIds.addAll(entry.getValue());
+ try {
+ // Update the ack map with the new locations of the acks
+ for (Entry<Integer, Set<Integer>> entry :
updatedAckLocations.entrySet()) {
+ Set<Integer> referenceFileIds =
metadata.ackMessageFileMap.get(entry.getKey());
+ if (referenceFileIds == null) {
+ referenceFileIds = new HashSet<>();
+ referenceFileIds.addAll(entry.getValue());
+ metadata.ackMessageFileMap.put(entry.getKey(),
referenceFileIds);
+ metadata.ackMessageFileMapDirtyFlag.lazySet(true);
+ } else {
+ referenceFileIds.addAll(entry.getValue());
+ }
}
- }
-
- // remove the old location data from the ack map so that the old
journal log file can
- // be removed on next GC.
- metadata.ackMessageFileMap.remove(journalToRead);
- metadata.ackMessageFileMapDirtyFlag.lazySet(true);
- indexLock.writeLock().unlock();
+ // remove the old location data from the ack map so that the old
journal log file can
+ // be removed on next GC.
+ metadata.ackMessageFileMap.remove(journalToRead);
+ metadata.ackMessageFileMapDirtyFlag.lazySet(true);
+ } finally {
+ indexLock.writeLock().unlock();
+ }
LOG.trace("ACK File Map following updates: {}",
metadata.ackMessageFileMap);
}
diff --git
a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java
b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java
index c672579b3..c5c5ca367 100644
---
a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java
+++
b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AbstractRuntimeConfigurationBroker.java
@@ -77,8 +77,8 @@ public class AbstractRuntimeConfigurationBroker extends
BrokerFilter {
public Destination addDestination(ConnectionContext context,
ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
Runnable work = addDestinationWork.poll();
if (work != null) {
+ addDestinationBarrier.writeLock().lockInterruptibly();
try {
- addDestinationBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = addDestinationWork.poll();
@@ -88,8 +88,8 @@ public class AbstractRuntimeConfigurationBroker extends
BrokerFilter {
addDestinationBarrier.writeLock().unlock();
}
} else {
+ addDestinationBarrier.readLock().lockInterruptibly();
try {
- addDestinationBarrier.readLock().lockInterruptibly();
return super.addDestination(context, destination,
createIfTemporary);
} finally {
addDestinationBarrier.readLock().unlock();
@@ -102,8 +102,8 @@ public class AbstractRuntimeConfigurationBroker extends
BrokerFilter {
public void addConnection(ConnectionContext context, ConnectionInfo info)
throws Exception {
Runnable work = addConnectionWork.poll();
if (work != null) {
+ addConnectionBarrier.writeLock().lockInterruptibly();
try {
- addConnectionBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = addConnectionWork.poll();
@@ -113,8 +113,8 @@ public class AbstractRuntimeConfigurationBroker extends
BrokerFilter {
addConnectionBarrier.writeLock().unlock();
}
} else {
+ addConnectionBarrier.readLock().lockInterruptibly();
try {
- addConnectionBarrier.readLock().lockInterruptibly();
super.addConnection(context, info);
} finally {
addConnectionBarrier.readLock().unlock();
@@ -131,8 +131,8 @@ public class AbstractRuntimeConfigurationBroker extends
BrokerFilter {
protected void applyDestinationWork() throws Exception {
Runnable work = addDestinationWork.poll();
if (work != null) {
+ addDestinationBarrier.writeLock().lockInterruptibly();
try {
- addDestinationBarrier.writeLock().lockInterruptibly();
do {
work.run();
work = addDestinationWork.poll();