This is an automated email from the ASF dual-hosted git repository.
markt-asf pushed a commit to branch 11.0.x
in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/11.0.x by this push:
new e70be58c79 Fix concurrency issues in TwoPhaseCommitInterceptor
e70be58c79 is described below
commit e70be58c79dfc297f795a9da9e0beb74a89b1a13
Author: Mark Thomas <[email protected]>
AuthorDate: Tue Jun 9 15:02:05 2026 +0100
Fix concurrency issues in TwoPhaseCommitInterceptor
---
.../interceptors/TwoPhaseCommitInterceptor.java | 22 +++++++++++-----------
webapps/docs/changelog.xml | 4 ++++
2 files changed, 15 insertions(+), 11 deletions(-)
diff --git
a/java/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
b/java/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
index d91ad9022b..3c9ca7bbc8 100644
---
a/java/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
+++
b/java/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
@@ -16,8 +16,9 @@
*/
package org.apache.catalina.tribes.group.interceptors;
-import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
@@ -55,7 +56,7 @@ public class TwoPhaseCommitInterceptor extends
ChannelInterceptorBase {
/**
* Map of pending messages keyed by their unique ID.
*/
- protected final HashMap<UniqueId,MapEntry> messages = new HashMap<>();
+ protected final Map<UniqueId,MapEntry> messages = new
ConcurrentHashMap<>();
/**
* Message expiration time in milliseconds.
@@ -105,10 +106,9 @@ public class TwoPhaseCommitInterceptor extends
ChannelInterceptorBase {
END_DATA, 0, END_DATA.length)) {
UniqueId id =
new UniqueId(msg.getMessage().getBytesDirect(),
START_DATA.length, msg.getUniqueId().length);
- MapEntry original = messages.get(id);
+ MapEntry original = messages.remove(id);
if (original != null) {
super.messageReceived(original.msg);
- messages.remove(id);
} else {
log.warn(sm.getString("twoPhaseCommitInterceptor.originalMessage.missing",
Arrays.toString(id.getBytes())));
@@ -159,13 +159,13 @@ public class TwoPhaseCommitInterceptor extends
ChannelInterceptorBase {
public void heartbeat() {
try {
long now = System.currentTimeMillis();
- @SuppressWarnings("unchecked")
- Map.Entry<UniqueId,MapEntry>[] entries =
messages.entrySet().toArray(new Map.Entry[0]);
- for (Map.Entry<UniqueId,MapEntry> uniqueIdMapEntryEntry : entries)
{
- MapEntry entry = uniqueIdMapEntryEntry.getValue();
- if (entry.expired(now, expire)) {
-
log.info(sm.getString("twoPhaseCommitInterceptor.expiredMessage", entry.id));
- messages.remove(entry.id);
+ Iterator<Map.Entry<UniqueId,MapEntry>> iter =
messages.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<UniqueId,MapEntry> entry = iter.next();
+ MapEntry value = entry.getValue();
+ if (value.expired(now, expire)) {
+
log.info(sm.getString("twoPhaseCommitInterceptor.expiredMessage", value.id));
+ iter.remove();
}
}
} catch (Exception e) {
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 34a7b6e5d5..71aeb9f13e 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -363,6 +363,10 @@
Fix some concurrency issues in <code>OrderInterceptor</code>.
(markt)
</fix>
+ <fix>
+ Fix some concurrency issues in <code>TwoPhaseCommitInterceptor</code>.
+ (markt)
+ </fix>
</changelog>
</subsection>
<subsection name="WebSocket">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]