Author: gtully
Date: Mon Oct 18 09:27:43 2010
New Revision: 1023704
URL: http://svn.apache.org/viewvc?rev=1023704&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2985 - the use of
selectors means replay and recovery from the begining of the store. unmatched
are removed on initial dispatch
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1023704&r1=1023703&r2=1023704&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Oct 18 09:27:43 2010
@@ -150,6 +150,7 @@ public class DurableTopicSubscription ex
}
public void deactivate(boolean keepDurableSubsActive) throws Exception {
+ LOG.debug("Dectivating " + this);
active = false;
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1023704&r1=1023703&r2=1023704&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Mon Oct 18 09:27:43 2010
@@ -727,8 +727,7 @@ public class KahaDBStore extends Message
// The subscription might not exist.
return 0;
}
- sd.orderIndex.resetCursorPosition();
- sd.orderIndex.setBatch(tx, cursorPos);
+
int counter = 0;
try {
String selector = info.getSelector();
@@ -736,6 +735,8 @@ public class KahaDBStore extends Message
if (selector != null) {
selectorExpression =
SelectorParser.parse(selector);
}
+ sd.orderIndex.resetCursorPosition();
+ sd.orderIndex.setBatch(tx, (selectorExpression !=
null? 0 : cursorPos));
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry =
iterator.next();
@@ -764,28 +765,31 @@ public class KahaDBStore extends Message
public void recoverSubscription(String clientId, String
subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
- indexLock.readLock().lock();
+ final SubscriptionInfo info = lookupSubscription(clientId,
subscriptionName);
+ indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx,
subscriptionKey);
- MessageOrderCursor moc = new
MessageOrderCursor(cursorPos + 1);
- for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx, moc); iterator
+ sd.orderIndex.setBatch(tx, (info.getSelector() == null
? cursorPos : 0));
+ for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
listener.recoverMessage(loadMessage(entry.getValue().location));
}
+ sd.orderIndex.resetCursorPosition();
}
});
}finally {
- indexLock.readLock().unlock();
+ indexLock.writeLock().unlock();
}
}
public void recoverNextMessages(String clientId, String
subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
+ final SubscriptionInfo info = lookupSubscription(clientId,
subscriptionName);
indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
@@ -795,7 +799,7 @@ public class KahaDBStore extends Message
MessageOrderCursor moc =
sd.subscriptionCursors.get(subscriptionKey);
if (moc == null) {
long pos = sd.subscriptionAcks.get(tx,
subscriptionKey);
- sd.orderIndex.setBatch(tx, pos);
+ sd.orderIndex.setBatch(tx, (info.getSelector() ==
null ? pos : 0));
moc = sd.orderIndex.cursor;
} else {
sd.orderIndex.cursor.sync(moc);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1023704&r1=1023703&r2=1023704&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Mon Oct 18 09:27:43 2010
@@ -2035,7 +2035,7 @@ public class MessageDatabase extends Ser
BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws
IOException {
for (Iterator<Entry<Long, MessageKeys>> iterator =
index.iterator(tx); iterator.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
- if (entry.getKey().compareTo(sequenceId) <= 0) {
+ if (entry.getKey().compareTo(sequenceId) == 0) {
// We don't do the actually delete while we are
// iterating the BTree since
// iterating would fail.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1023704&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Mon Oct 18 09:27:43 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import javax.jms.*;
+import java.io.File;
+
+public class DurableSubscriptionOfflineTest extends
org.apache.activemq.TestSupport {
+
+ private BrokerService broker;
+ private ActiveMQTopic topic;
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
+ return new ActiveMQConnectionFactory("vm://" + getName());
+ }
+
+ @Override
+ protected Connection createConnection() throws Exception {
+ Connection con = super.createConnection();
+ con.setClientID("cliName");
+ con.start();
+ return con;
+ }
+
+ protected void setUp() throws Exception {
+ topic = (ActiveMQTopic) createDestination();
+ createBroker();
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ destroyBroker();
+ }
+
+ private void createBroker() throws Exception {
+ broker = BrokerFactory.createBroker("broker:(vm://localhost)");
+ broker.setBrokerName(getName());
+ broker.setDeleteAllMessagesOnStartup(true);
+
+ broker.setPersistent(true);
+ KahaDBPersistenceAdapter persistenceAdapter = new
KahaDBPersistenceAdapter();
+ persistenceAdapter.setDirectory(new File("activemq-data-kaha/" +
getName()));
+ broker.setPersistenceAdapter(persistenceAdapter);
+
+ broker.start();
+ }
+
+ private void destroyBroker() throws Exception {
+ if (broker != null)
+ broker.stop();
+ }
+
+ public void testOfflineSubscription() throws Exception {
+ // create durable subscription
+ Connection con = createConnection();
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session.createDurableSubscriber(topic, "SubsId", "filter = 'true'",
true);
+ session.close();
+ con.close();
+
+ // send messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(null);
+
+ int sent = 0;
+ for (int i = 0; i < 10; i++) {
+ boolean filter = i % 2 == 1;
+ if (filter)
+ sent++;
+
+ Message message = session.createMessage();
+ message.setStringProperty("filter", filter ? "true" : "false");
+ producer.send(topic, message);
+ }
+
+ session.close();
+ con.close();
+
+ // consume messages
+ con = createConnection();
+ session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createDurableSubscriber(topic,
"SubsId", "filter = 'true'", true);
+ Listener listener = new Listener();
+ consumer.setMessageListener(listener);
+
+ Thread.sleep(3 * 1000);
+
+ session.close();
+ con.close();
+
+ assertEquals(sent, listener.count);
+ }
+
+ public static class Listener implements MessageListener {
+ int count = 0;
+
+ public void onMessage(Message message) {
+ count++;
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date