Author: rajdavies
Date: Wed Apr 9 08:27:42 2008
New Revision: 646401
URL: http://svn.apache.org/viewvc?rev=646401&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1488
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=646401&r1=646400&r2=646401&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Wed Apr 9 08:27:42 2008
@@ -22,9 +22,10 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
@@ -69,7 +70,7 @@
private final Object backupMutex = new Object();
private final Object sleepMutex = new Object();
private final ConnectionStateTracker stateTracker = new
ConnectionStateTracker();
- private final ConcurrentHashMap<Integer, Command> requestMap = new
ConcurrentHashMap<Integer, Command>();
+ private final Map<Integer, Command> requestMap = new
LinkedHashMap<Integer, Command>();
private URI connectedTransportURI;
private URI failedConnectTransportURI;
@@ -139,7 +140,10 @@
return;
}
if (command.isResponse()) {
- Object object =
requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
+ Object object = null;
+ synchronized(requestMap) {
+ object =
requestMap.remove(Integer.valueOf(((Response)command).getCorrelationId()));
+ }
if (object != null && object.getClass() == Tracked.class) {
((Tracked)object).onResponses();
}
@@ -426,10 +430,12 @@
// then hold it in the requestMap so that we can replay
// it later.
Tracked tracked = stateTracker.track(command);
- if (tracked != null && tracked.isWaitingForResponse())
{
-
requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
- } else if (tracked == null &&
command.isResponseRequired()) {
-
requestMap.put(Integer.valueOf(command.getCommandId()), command);
+ synchronized(requestMap) {
+ if (tracked != null &&
tracked.isWaitingForResponse()) {
+
requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
+ } else if (tracked == null &&
command.isResponseRequired()) {
+
requestMap.put(Integer.valueOf(command.getCommandId()), command);
+ }
}
// Send the message.
@@ -581,7 +587,11 @@
cc.setFaultTolerant(true);
t.oneway(cc);
stateTracker.restore(t);
- for (Iterator<Command> iter2 = requestMap.values().iterator();
iter2.hasNext();) {
+ Map tmpMap = null;
+ synchronized(requestMap) {
+ tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
+ }
+ for (Iterator<Command> iter2 = tmpMap.values().iterator();
iter2.hasNext();) {
Command command = iter2.next();
t.oneway(command);
}