Author: gtully
Date: Wed Feb 4 15:19:56 2009
New Revision: 740765
URL: http://svn.apache.org/viewvc?rev=740765&view=rev
Log:
move setBatch to MessageStore interface to keep cursors store agnostic -
http://issues.apache.org/activemq/browse/AMQ-2020 - some store specific tests
to follow
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Wed Feb 4 15:19:56 2009
@@ -75,15 +75,7 @@
}
protected void setBatch(MessageId messageId) {
- AMQMessageStore amqStore = (AMQMessageStore) store;
- try {
- amqStore.flush();
- } catch (InterruptedIOException e) {
- LOG.debug("flush on setBatch resulted in exception", e);
- }
- KahaReferenceStore kahaStore =
- (KahaReferenceStore) amqStore.getReferenceStore();
- kahaStore.setBatch(messageId);
+ store.setBatch(messageId);
batchResetNeeded = false;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
Wed Feb 4 15:19:56 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.store;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.usage.MemoryUsage;
@@ -42,4 +43,7 @@
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
+
+ public void setBatch(MessageId messageId) {
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
Wed Feb 4 15:19:56 2009
@@ -110,4 +110,11 @@
void recoverNextMessages(int maxReturned, MessageRecoveryListener
listener) throws Exception;
void dispose(ConnectionContext context);
+
+ /**
+ * allow caching cursors to set the current batch offset when cache is
exhausted
+ * @param messageId
+ */
+ void setBatch(MessageId messageId);
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
Wed Feb 4 15:19:56 2009
@@ -92,4 +92,8 @@
delegate.resetBatching();
}
+
+ public void setBatch(MessageId messageId) {
+ delegate.setBatch(messageId);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Wed Feb 4 15:19:56 2009
@@ -134,4 +134,8 @@
delegate.resetBatching();
}
+
+ public void setBatch(MessageId messageId) {
+ delegate.setBatch(messageId);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Wed Feb 4 15:19:56 2009
@@ -558,4 +558,14 @@
referenceStore.dispose(context);
super.dispose(context);
}
+
+ public void setBatch(MessageId messageId) {
+ try {
+ flush();
+ } catch (InterruptedIOException e) {
+ LOG.debug("flush on setBatch resulted in exception", e);
+ }
+ getReferenceStore().setBatch(messageId);
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Wed Feb 4 15:19:56 2009
@@ -46,6 +46,7 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -70,16 +71,20 @@
final int ackWindow = 50;
final int ackBatchSize = 50;
final int fullWindow = 200;
- final int count = 20000;
+ protected int count = 20000;
public void setUp() throws Exception {
- brokerService = new BrokerService();
+ brokerService = createBroker();
brokerService.setUseJmx(false);
brokerService.deleteAllMessages();
brokerService.start();
}
- public void tearDown() throws Exception {
+ protected BrokerService createBroker() throws Exception {
+ return new BrokerService();
+ }
+
+ public void tearDown() throws Exception {
brokerService.stop();
}
@@ -92,8 +97,7 @@
}
public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth)
throws Exception {
- final AMQPersistenceAdapter persistenceAdapter =
- (AMQPersistenceAdapter) brokerService.getPersistenceAdapter();
+ final PersistenceAdapter persistenceAdapter =
brokerService.getPersistenceAdapter();
final MessageStore queueMessageStore =
persistenceAdapter.createQueueMessageStore(destination);
final ConnectionContext contextNotInTx = new ConnectionContext();
@@ -127,10 +131,9 @@
Message message = getMessage(i);
queue.send(producerExchange, message);
}
-
- assertEquals("store count is correct", count, queueMessageStore
- .getMessageCount());
+ assertEquals("store count is correct", count,
queueMessageStore.getMessageCount());
+
// pull from store in small windows
Subscription subscription = new Subscription() {
@@ -305,7 +308,6 @@
if (removeIndex % 1000 == 0) {
LOG.info("acked: " + removeIndex);
persistenceAdapter.checkpoint(true);
- persistenceAdapter.cleanup();
}
}
}