Author: rajdavies
Date: Wed Apr 9 09:25:38 2008
New Revision: 646422
URL: http://svn.apache.org/viewvc?rev=646422&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1623
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java?rev=646422&r1=646421&r2=646422&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySetIterator.java
Wed Apr 9 09:25:38 2008
@@ -54,6 +54,9 @@
public void remove() {
if (currentItem != null) {
container.remove(currentItem);
+ if (nextItem != null) {
+ list.refreshEntry(nextItem);
+ }
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=646422&r1=646421&r2=646422&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Wed Apr 9 09:25:38 2008
@@ -17,6 +17,8 @@
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -192,11 +194,15 @@
public void removeAllMessages(ConnectionContext context) throws
IOException {
lock.lock();
try {
+ Set<MessageId> tmpSet = new HashSet(messageContainer.keySet());
+ for (MessageId id:tmpSet) {
+ removeMessage(id);
+ }
+ resetBatching();
messageContainer.clear();
}finally {
lock.unlock();
}
-
}
public ActiveMQDestination getDestination() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=646422&r1=646421&r2=646422&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Wed Apr 9 09:25:38 2008
@@ -17,8 +17,10 @@
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
@@ -299,6 +301,23 @@
}finally {
lock.unlock();
}
+ }
+
+ public void removeAllMessages(ConnectionContext context) throws
IOException {
+ lock.lock();
+ try {
+ Set<String> tmpSet = new
HashSet<String>(subscriberContainer.keySet());
+ for (String key:tmpSet) {
+ TopicSubContainer container = subscriberMessages.get(key);
+ if (container != null) {
+ container.clear();
+ }
+ }
+ ackContainer.clear();
+ }finally {
+ lock.unlock();
+ }
+ super.removeAllMessages(context);
}
protected void removeSubscriberMessageContainer(String clientId, String
subscriptionName) throws IOException {