Author: gtully
Date: Fri Oct 15 12:29:39 2010
New Revision: 1022890
URL: http://svn.apache.org/viewvc?rev=1022890&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2980 for kahaDB
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1022890&r1=1022889&r2=1022890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Fri Oct 15 12:29:39 2010
@@ -717,7 +717,7 @@ public class KahaDBStore extends Message
public int getMessageCount(String clientId, String subscriptionName)
throws IOException {
final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId,
subscriptionName);
- indexLock.readLock().lock();
+ indexLock.writeLock().lock();
try {
return pageFile.tx().execute(new
Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
@@ -727,8 +727,8 @@ public class KahaDBStore extends Message
// The subscription might not exist.
return 0;
}
- MessageOrderCursor moc = new
MessageOrderCursor(cursorPos + 1);
-
+ sd.orderIndex.resetCursorPosition();
+ sd.orderIndex.setBatch(tx, cursorPos);
int counter = 0;
try {
String selector = info.getSelector();
@@ -736,7 +736,7 @@ public class KahaDBStore extends Message
if (selector != null) {
selectorExpression =
SelectorParser.parse(selector);
}
- for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx, moc); iterator
+ for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry =
iterator.next();
if (selectorExpression != null) {
@@ -757,7 +757,7 @@ public class KahaDBStore extends Message
}
});
}finally {
- indexLock.readLock().unlock();
+ indexLock.writeLock().unlock();
}
}
@@ -786,15 +786,19 @@ public class KahaDBStore extends Message
public void recoverNextMessages(String clientId, String
subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
- indexLock.readLock().lock();
+ indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
+ sd.orderIndex.resetCursorPosition();
MessageOrderCursor moc =
sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) {
long pos = sd.subscriptionAcks.get(tx,
subscriptionKey);
- moc = new MessageOrderCursor(pos+1);
+ sd.orderIndex.setBatch(tx, pos);
+ moc = sd.orderIndex.cursor;
+ } else {
+ sd.orderIndex.cursor.sync(moc);
}
Entry<Long, MessageKeys> entry = null;
@@ -813,11 +817,14 @@ public class KahaDBStore extends Message
if (entry != null) {
MessageOrderCursor copy =
sd.orderIndex.cursor.copy();
sd.subscriptionCursors.put(subscriptionKey, copy);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updated moc: " + copy + ",
recovered: " + counter);
+ }
}
}
});
}finally {
- indexLock.readLock().unlock();
+ indexLock.writeLock().unlock();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1022890&r1=1022889&r2=1022890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Oct 15 12:29:39 2010
@@ -1875,6 +1875,18 @@ public class MessageDatabase extends Ser
lowPriorityCursorPosition++;
}
}
+
+ public String toString() {
+ return "MessageOrderCursor:[def:" + defaultCursorPosition
+ + ", low:" + lowPriorityCursorPosition
+ + ", high:" + highPriorityCursorPosition + "]";
+ }
+
+ public void sync(MessageOrderCursor other) {
+ this.defaultCursorPosition=other.defaultCursorPosition;
+ this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
+ this.highPriorityCursorPosition=other.highPriorityCursorPosition;
+ }
}
class MessageOrderIndex{
@@ -2010,11 +2022,11 @@ public class MessageDatabase extends Ser
void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>>
deletes, Long sequenceId)
throws IOException {
- getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
- if (highPriorityIndex != null) {
+ if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
+ getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
+ } else if (highPriorityIndex != null &&
highPriorityIndex.containsKey(tx, sequenceId)) {
getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
- }
- if (lowPriorityIndex != null) {
+ } else if (lowPriorityIndex != null &&
lowPriorityIndex.containsKey(tx, sequenceId)) {
getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
}
}
@@ -2073,7 +2085,6 @@ public class MessageDatabase extends Ser
final Iterator<Entry<Long, MessageKeys>>highIterator;
final Iterator<Entry<Long, MessageKeys>>defaultIterator;
final Iterator<Entry<Long, MessageKeys>>lowIterator;
- Long lastKey;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1022890&r1=1022889&r2=1022890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Fri Oct 15 12:29:39 2010
@@ -27,6 +27,7 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -49,7 +50,8 @@ abstract public class MessagePriorityTes
Session sess;
public boolean useCache;
-
+ public int prefetchVal = 500;
+
int MSG_NUM = 1000;
int HIGH_PRI = 7;
int LOW_PRI = 3;
@@ -59,6 +61,7 @@ abstract public class MessagePriorityTes
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setBrokerName("priorityTest");
+ broker.setAdvisorySupport(false);
adapter = createPersistenceAdapter(true);
broker.setPersistenceAdapter(adapter);
PolicyEntry policy = new PolicyEntry();
@@ -71,6 +74,10 @@ abstract public class MessagePriorityTes
broker.waitUntilStarted();
factory = new ActiveMQConnectionFactory("vm://priorityTest");
+ ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+ prefetch.setAll(prefetchVal);
+ factory.setPrefetchPolicy(prefetch);
+ factory.setWatchTopicAdvisories(false);
conn = factory.createConnection();
conn.setClientID("priority");
conn.start();
@@ -159,6 +166,10 @@ abstract public class MessagePriorityTes
LOG.info("Sending " + text);
return msg;
}
+
+ public void initCombosForTestDurableSubs() {
+ addCombinationValues("prefetchVal", new Object[] {new Integer(1000),
new Integer(MSG_NUM/4)});
+ }
public void testDurableSubs() throws Exception {
ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
@@ -176,11 +187,45 @@ abstract public class MessagePriorityTes
sub = sess.createDurableSubscriber(topic, "priority");
for (int i = 0; i < MSG_NUM * 2; i++) {
- Message msg = sub.receive(1000);
+ Message msg = sub.receive(5000);
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ?
HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
}
+
+ public void initCombosForTestDurableSubsReconnect() {
+ addCombinationValues("prefetchVal", new Object[] {new Integer(1000),
new Integer(MSG_NUM/2)});
+ }
+
+ public void testDurableSubsReconnect() throws Exception {
+ ActiveMQTopic topic = (ActiveMQTopic)sess.createTopic("TEST");
+ final String subName = "priorityDisconnect";
+ TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+ sub.close();
+
+ ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
+ ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
+
+ lowPri.start();
+ highPri.start();
+
+ lowPri.join();
+ highPri.join();
+
+
+ final int closeFrequency = MSG_NUM/4;
+ sub = sess.createDurableSubscriber(topic, subName);
+ for (int i = 0; i < MSG_NUM * 2; i++) {
+ Message msg = sub.receive(5000);
+ assertNotNull("Message " + i + " was null", msg);
+ assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ?
HIGH_PRI : LOW_PRI, msg.getJMSPriority());
+ if (i>0 && i%closeFrequency==0) {
+ LOG.info("Closing durable sub.. on: " + i);
+ sub.close();
+ sub = sess.createDurableSubscriber(topic, subName);
+ }
+ }
+ }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1022890&r1=1022889&r2=1022890&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Fri Oct 15 12:29:39 2010
@@ -40,4 +40,9 @@ public class JDBCMessagePriorityTest ext
return suite(JDBCMessagePriorityTest.class);
}
+ // pending fix...
+ @Override
+ public void testDurableSubsReconnect() throws Exception {
+ // TODO: fix jdbc durable sub recovery
+ }
}