This is an automated email from the ASF dual-hosted git repository. mattrpav pushed a commit to branch activemq-5.16.x in repository https://gitbox.apache.org/repos/asf/activemq.git
commit f2dbc92743cfacf3b5397f0aae82a24dcfa4b7c1 Author: Matt Pavlovich <[email protected]> AuthorDate: Mon Dec 20 08:37:56 2021 -0600 [AMQ-8400] Add synchronization handling for Transaction to prevent CME (#720) --- .../apache/activemq/transaction/Transaction.java | 58 +++++++++++++--------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java index 13ec353..0933480 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java @@ -52,15 +52,15 @@ public abstract class Transaction { public Object call() throws Exception { doPreCommit(); return null; - } + } }); protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() { public Object call() throws Exception { doPostCommit(); return null; - } + } }); - + public byte getState() { return state; } @@ -87,15 +87,19 @@ public abstract class Transaction { } public Synchronization findMatching(Synchronization r) { - int existing = synchronizations.indexOf(r); - if (existing != -1) { - return synchronizations.get(existing); - } + synchronized(synchronizations) { + int existing = synchronizations.indexOf(r); + if (existing != -1) { + return synchronizations.get(existing); + } + } return null; } public void removeSynchronization(Synchronization r) { - synchronizations.remove(r); + synchronized(synchronizations) { + synchronizations.remove(r); + } } public void prePrepare() throws Exception { @@ -119,26 +123,32 @@ public abstract class Transaction { throw xae; } } - + protected void fireBeforeCommit() throws Exception { - for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { - Synchronization s = iter.next(); - s.beforeCommit(); + synchronized(synchronizations) { + for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { + Synchronization s = iter.next(); + s.beforeCommit(); + } } } protected void fireAfterCommit() throws Exception { - for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { - Synchronization s = iter.next(); - s.afterCommit(); + synchronized(synchronizations) { + for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { + Synchronization s = iter.next(); + s.afterCommit(); + } } } public void fireAfterRollback() throws Exception { - Collections.reverse(synchronizations); - for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { - Synchronization s = iter.next(); - s.afterRollback(); + synchronized(synchronizations) { + Collections.reverse(synchronizations); + for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { + Synchronization s = iter.next(); + s.afterRollback(); + } } } @@ -156,15 +166,15 @@ public abstract class Transaction { public abstract TransactionId getTransactionId(); public abstract Logger getLog(); - + public boolean isPrepared() { return getState() == PREPARED_STATE; } - + public int size() { return synchronizations.size(); } - + protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException { try { postCommitTask.get(); @@ -179,9 +189,9 @@ public abstract class Transaction { } else { throw new XAException(e.toString()); } - } + } } - + protected void doPreCommit() throws XAException { try { fireBeforeCommit();
