Author: gtully
Date: Thu Mar 22 20:01:40 2012
New Revision: 1304020
URL: http://svn.apache.org/viewvc?rev=1304020&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3768: ClassCastException when running
some Durable Consumer test cases. root cause of the classcast was the reuse of
a freed node that was still referenced as the head page id of a listindex. The
fix is to not modify the head page id of a listindex when removing and
coalescing linked pages, the head page remains valid for the duration of a
subscription. Eventually got a test case that could easlily reproduce
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1304020&r1=1304019&r2=1304020&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Thu Mar 22 20:01:40 2012
@@ -33,6 +33,8 @@ import org.apache.activemq.broker.jmx.Du
import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -1244,6 +1246,52 @@ public class DurableSubscriptionOfflineT
assertEquals("only one journal file left after restart", 1,
pa.getStore().getJournal().getFileMap().size());
}
+ // https://issues.apache.org/jira/browse/AMQ-3768
+ public void testPageReuse() throws Exception {
+ Connection con = null;
+ Session session = null;
+
+ final int numConsumers = 115;
+ for (int i=0; i<=numConsumers;i++) {
+ con = createConnection("cli" + i);
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", null, true);
+ session.close();
+ con.close();
+ }
+
+
+ // populate ack locations
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+ Message message = session.createTextMessage(new byte[10].toString());
+ producer.send(topic, message);
+ con.close();
+
+ // we have a split, remove all but the last so that
+ // the head pageid changes in the acklocations listindex
+ for (int i=0; i<=numConsumers -1; i++) {
+ con = createConnection("cli" + i);
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.unsubscribe("SubsId");
+ session.close();
+ con.close();
+ }
+
+ destroyBroker();
+ createBroker(false);
+
+ // create a bunch more subs to reuse the freed page and get us in a
knot
+ for (int i=1; i<=numConsumers;i++) {
+ con = createConnection("cli" + i);
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", filter, true);
+ session.close();
+ con.close();
+ }
+ }
+
public static class Listener implements MessageListener {
int count = 0;
String id = null;
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java?rev=1304020&r1=1304019&r2=1304020&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
Thu Mar 22 20:01:40 2012
@@ -304,14 +304,10 @@ public class ListIndex<Key,Value> implem
ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws
IOException {
Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
- try {
- ListNode<Key, Value> node = page.get();
- node.setPage(page);
- node.setContainingList(this);
- return node;
- } catch (ClassCastException e) {
- throw e;
- }
+ ListNode<Key, Value> node = page.get();
+ node.setPage(page);
+ node.setContainingList(this);
+ return node;
}
ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws
IOException {
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java?rev=1304020&r1=1304019&r2=1304020&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
Thu Mar 22 20:01:40 2012
@@ -198,10 +198,22 @@ public final class ListNode<Key,Value> {
if (currentNode.isHead() && currentNode.isTail()) {
// store empty list
} else if (currentNode.isHead()) {
- // new head
+ // merge next node into existing headNode
+ // as we don't want to change our headPageId b/c
+ // that is our identity
+ ListNode<Key,Value> headNode = currentNode;
+ nextEntry = getFromNextNode(); // will move currentNode
+
+ if (currentNode.isTail()) {
+ targetList.setTailPageId(headNode.getPageId());
+ }
+ // copy next/currentNode into head
+ headNode.setEntries(currentNode.entries);
+ headNode.setNext(currentNode.getNext());
+ headNode.store(tx);
toRemoveNode = currentNode;
- nextEntry = getFromNextNode();
- targetList.setHeadPageId(currentNode.getPageId());
+ currentNode = headNode;
+
} else if (currentNode.isTail()) {
toRemoveNode = currentNode;
previousNode.setNext(ListIndex.NOT_SET);
@@ -262,7 +274,7 @@ public final class ListNode<Key,Value> {
@SuppressWarnings({ "unchecked", "rawtypes" })
public ListNode<Key,Value> readPayload(DataInput is) throws
IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>();
- node.next = is.readLong();
+ node.setNext(is.readLong());
final short size = is.readShort();
for (short i = 0; i < size; i++) {
node.entries.addLast(