Author: rajdavies
Date: Sat Feb 18 06:43:23 2006
New Revision: 378727
URL: http://svn.apache.org/viewcvs?rev=378727&view=rev
Log:
synchronized around memoryTable - prevent concurrent access whilst iterating
for recovery
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=378727&r1=378726&r2=378727&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
Sat Feb 18 06:43:23 2006
@@ -72,12 +72,15 @@
}
public void recover(MessageRecoveryListener listener) throws Throwable {
- for (Iterator iter = messageTable.values().iterator();
iter.hasNext();) {
- Object msg = (Object) iter.next();
- if( msg.getClass() == String.class ) {
- listener.recoverMessageReference((String) msg);
- } else {
- listener.recoverMessage((Message) msg);
+ // the message table is a synchronizedMap - so just have to
synchronize here
+ synchronized(messageTable){
+ for(Iterator
iter=messageTable.values().iterator();iter.hasNext();){
+ Object msg=(Object) iter.next();
+ if(msg.getClass()==String.class){
+ listener.recoverMessageReference((String) msg);
+ }else{
+ listener.recoverMessage((Message) msg);
+ }
}
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=378727&r1=378726&r2=378727&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
Sat Feb 18 06:43:23 2006
@@ -88,20 +88,24 @@
subscriberDatabase.remove(key);
}
- public void recoverSubscription(String clientId, String subscriptionName,
MessageRecoveryListener listener) throws Throwable {
- MessageId lastAck = (MessageId) ackDatabase.get(new
SubscriptionKey(clientId, subscriptionName));
- boolean pastLastAck = lastAck==null;
- for (Iterator iter = messageTable.entrySet().iterator();
iter.hasNext();) {
- Map.Entry entry = (Entry) iter.next();
- if( pastLastAck ) {
- Object msg = entry.getValue();
- if( msg.getClass() == String.class ) {
- listener.recoverMessageReference((String) msg);
- } else {
- listener.recoverMessage((Message) msg);
+ public void recoverSubscription(String clientId,String
subscriptionName,MessageRecoveryListener listener)
+ throws Throwable{
+ MessageId lastAck=(MessageId) ackDatabase.get(new
SubscriptionKey(clientId,subscriptionName));
+ boolean pastLastAck=lastAck==null;
+ // the message table is a synchronizedMap - so just have to
synchronize here
+ synchronized(messageTable){
+ for(Iterator
iter=messageTable.entrySet().iterator();iter.hasNext();){
+ Map.Entry entry=(Entry) iter.next();
+ if(pastLastAck){
+ Object msg=entry.getValue();
+ if(msg.getClass()==String.class){
+ listener.recoverMessageReference((String) msg);
+ }else{
+ listener.recoverMessage((Message) msg);
+ }
+ }else{
+ pastLastAck=entry.getKey().equals(lastAck);
}
- } else {
- pastLastAck = entry.getKey().equals(lastAck);
}
}
}