Author: rajdavies Date: Fri Nov 24 11:57:51 2006 New Revision: 478967 URL: http://svn.apache.org/viewvc?view=rev&rev=478967 Log: implementation of store based cursors for Queues and Durable Subscribers, to fix: http://issues.apache.org/activemq/browse/AMQ-845 http://issues.apache.org/activemq/browse/AMQ-1062 http://issues.apache.org/activemq/browse/AMQ-1061 http://issues.apache.org/activemq/browse/AMQ-493 http://issues.apache.org/activemq/browse/AMQ-914
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (with props) incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (with props) incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Nov 24 11:57:51 2006 @@ -427,6 +427,10 @@ public void recoverMessageReference(String messageReference) throws Exception{} public void finished(){} + + public boolean hasSpace(){ + return true; + } }); }catch(Throwable e){ log.error("Failed to browse messages for Subscription "+view,e); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Nov 24 11:57:51 2006 @@ -117,6 +117,7 @@ public void initialize() throws Exception{ if(store!=null){ // Restore the persistent messages. + messages.setUsageManager(getUsageManager()); messages.start(); if(messages.isRecoveryRequired()){ store.recover(new MessageRecoveryListener(){ @@ -145,6 +146,10 @@ public void finished(){ } + + public boolean hasSpace(){ + return true; + } }); } } @@ -242,6 +247,9 @@ synchronized (consumers) { consumers.remove(sub); + if (consumers.isEmpty()) { + messages.gc(); + } } sub.remove(context, this); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Nov 24 11:57:51 2006 @@ -203,6 +203,10 @@ } public void finished(){} + + public boolean hasSpace(){ + return true; + } }); } @@ -334,6 +338,10 @@ public void recoverMessageReference(String messageReference) throws Exception{} public void finished(){} + + public boolean hasSpace(){ + return true; + } }); Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination()); if(msgs!=null){ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Fri Nov 24 11:57:51 2006 @@ -219,6 +219,7 @@ PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor( context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(), info.getPrefetchSize()); + cursor.setUsageManager(memoryManager); sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor); durableSubscriptions.put(key,sub); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Nov 24 11:57:51 2006 @@ -14,10 +14,10 @@ package org.apache.activemq.broker.region.cursors; -import java.io.IOException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.memory.UsageManager; /** * Abstract method holder for pending message (messages awaiting disptach to a consumer) cursor @@ -27,11 +27,13 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{ protected int maxBatchSize=100; + protected UsageManager usageManager; public void start() throws Exception{ } public void stop() throws Exception{ + gc(); } public void add(ConnectionContext context,Destination destination) throws Exception{ @@ -86,17 +88,22 @@ protected void fillBatch() throws Exception{ } - /** - * Give the cursor a hint that we are about to remove messages from memory only - */ public void resetForGC(){ reset(); } - /** - * @param node - * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) - */ public void remove(MessageReference node){ + } + + public void gc(){ + } + + + public void setUsageManager(UsageManager usageManager){ + this.usageManager = usageManager; + } + + public boolean hasSpace() { + return usageManager != null ? !usageManager.isFull() : true; } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Fri Nov 24 11:57:51 2006 @@ -19,6 +19,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.memory.UsageManager; /** * Interface to pending message (messages awaiting disptach to a consumer) cursor @@ -125,4 +126,18 @@ * @param node */ public void remove(MessageReference node); + + + /** + * free up any internal buffers + * + */ + public void gc(); + + /** + * Set the UsageManager + * @param usageManager + * @see org.apache.activemq.memory.UsageManager + */ + public void setUsageManager(UsageManager usageManager); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Fri Nov 24 11:57:51 2006 @@ -20,16 +20,12 @@ import java.io.IOException; import java.util.LinkedList; -import javax.jms.JMSException; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.TopicMessageStore; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +61,7 @@ public void stop() throws Exception{ store.resetBatching(); + gc(); } /** @@ -124,6 +121,10 @@ throws Exception{ // shouldn't get called throw new RuntimeException("Not supported"); + } + + public void gc() { + batchList.clear(); } // implementation Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Nov 24 11:57:51 2006 @@ -25,6 +25,7 @@ import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; import org.apache.activemq.kaha.Store; +import org.apache.activemq.memory.UsageManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -86,6 +87,7 @@ public synchronized void add(ConnectionContext context,Destination destination) throws Exception{ TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName); tsp.setMaxBatchSize(getMaxBatchSize()); + tsp.setUsageManager(usageManager); topics.put(destination,tsp); storePrefetches.add(tsp); if(started){ @@ -200,6 +202,21 @@ tsp.setMaxBatchSize(maxBatchSize); } super.setMaxBatchSize(maxBatchSize); + } + + public synchronized void gc() { + for(Iterator i=storePrefetches.iterator();i.hasNext();){ + PendingMessageCursor tsp=(PendingMessageCursor)i.next(); + tsp.gc(); + } + } + + public synchronized void setUsageManager(UsageManager usageManager){ + super.setUsageManager(usageManager); + for(Iterator i=storePrefetches.iterator();i.hasNext();){ + PendingMessageCursor tsp=(PendingMessageCursor)i.next(); + tsp.setUsageManager(usageManager); + } } protected synchronized PendingMessageCursor getNextCursor() throws Exception{ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Fri Nov 24 11:57:51 2006 @@ -14,11 +14,11 @@ package org.apache.activemq.broker.region.cursors; -import java.util.Iterator; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; import org.apache.activemq.kaha.Store; +import org.apache.activemq.memory.UsageManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,6 +55,7 @@ if(nonPersistent==null){ nonPersistent=new FilePendingMessageCursor(queue.getDestination(),tmpStore); nonPersistent.setMaxBatchSize(getMaxBatchSize()); + nonPersistent.setUsageManager(usageManager); } nonPersistent.start(); persistent.start(); @@ -65,8 +66,10 @@ started=false; if(nonPersistent!=null){ nonPersistent.stop(); + nonPersistent.gc(); } persistent.stop(); + persistent.gc(); pendingCount=0; } @@ -162,10 +165,29 @@ } super.setMaxBatchSize(maxBatchSize); } + + public void gc() { + if (persistent != null) { + persistent.gc(); + } + if (nonPersistent != null) { + nonPersistent.gc(); + } + } + + public void setUsageManager(UsageManager usageManager){ + super.setUsageManager(usageManager); + if (persistent != null) { + persistent.setUsageManager(usageManager); + } + if (nonPersistent != null) { + nonPersistent.setUsageManager(usageManager); + } + } protected synchronized PendingMessageCursor getNextCursor() throws Exception{ if(currentCursor==null||currentCursor.isEmpty()){ - currentCursor = currentCursor == persistent ? nonPersistent : persistent; + currentCursor=currentCursor==persistent?nonPersistent:persistent; } return currentCursor; } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Fri Nov 24 11:57:51 2006 @@ -66,6 +66,7 @@ public void stop() throws Exception{ store.resetBatching(clientId,subscriberName); + gc(); } /** @@ -136,6 +137,10 @@ Message message=(Message)batchList.getLast(); } + } + + public void gc() { + batchList.clear(); } public String toString() { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java Fri Nov 24 11:57:51 2006 @@ -26,4 +26,5 @@ void recoverMessage(Message message) throws Exception; void recoverMessageReference(String messageReference) throws Exception; void finished(); + boolean hasSpace(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Nov 24 11:57:51 2006 @@ -235,14 +235,18 @@ new JDBCMessageRecoveryListener(){ public void recoverMessage(long sequenceId,byte[] data) throws Exception{ - Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data)); - msg.getMessageId().setBrokerSequenceId(sequenceId); - listener.recoverMessage(msg); - lastMessageId.set(sequenceId); + if(listener.hasSpace()){ + Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data)); + msg.getMessageId().setBrokerSequenceId(sequenceId); + listener.recoverMessage(msg); + lastMessageId.set(sequenceId); + } } public void recoverMessageReference(String reference) throws Exception{ - listener.recoverMessageReference(reference); + if(listener.hasSpace()) { + listener.recoverMessageReference(reference); + } } public void finished(){ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Nov 24 11:57:51 2006 @@ -108,10 +108,12 @@ new JDBCMessageRecoveryListener(){ public void recoverMessage(long sequenceId,byte[] data) throws Exception{ - Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data)); - msg.getMessageId().setBrokerSequenceId(sequenceId); - listener.recoverMessage(msg); - finalLast.set(sequenceId); + if(listener.hasSpace()){ + Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data)); + msg.getMessageId().setBrokerSequenceId(sequenceId); + listener.recoverMessage(msg); + finalLast.set(sequenceId); + } } public void recoverMessageReference(String reference) throws Exception{ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Nov 24 11:57:51 2006 @@ -370,6 +370,7 @@ ResultSet rs=null; try{ s=c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement()); + s.setMaxRows(maxReturned); s.setString(1,destination.getQualifiedName()); s.setString(2,clientId); s.setString(3,subscriptionName); @@ -639,8 +640,9 @@ ResultSet rs=null; try{ s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement()); + s.setMaxRows(maxReturned); s.setString(1,destination.getQualifiedName()); - s.setLong(4,nextSeq); + s.setLong(2,nextSeq); rs=s.executeQuery(); int count=0; if(statements.isUseExternalMessageReferences()){ @@ -654,7 +656,9 @@ count++; } } - }finally{ + }catch(Exception e) { + e.printStackTrace(); + }finally { close(rs); close(s); listener.finished(); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java Fri Nov 24 11:57:51 2006 @@ -373,6 +373,10 @@ public void finished(){ listener.finished(); } + public boolean hasSpace(){ + // TODO Auto-generated method stub + return true; + } }); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Fri Nov 24 11:57:51 2006 @@ -68,6 +68,9 @@ public void finished(){ listener.finished(); } + public boolean hasSpace(){ + return true; + } }); } @@ -86,6 +89,9 @@ public void finished(){ listener.finished(); + } + public boolean hasSpace(){ + return true; } }); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Fri Nov 24 11:57:51 2006 @@ -210,7 +210,7 @@ } batchEntry = entry; entry=messageContainer.getNext(entry); - }while(entry!=null&&count<maxReturned); + }while(entry!=null&&count<maxReturned&&listener.hasSpace()); } listener.finished(); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Nov 24 11:57:51 2006 @@ -184,7 +184,7 @@ } container.setBatchEntry(entry); entry=container.getListContainer().getNext(entry); - }while(entry!=null&&count<maxReturned); + }while(entry!=null&&count<maxReturned&&listener.hasSpace()); } } listener.finished(); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri Nov 24 11:57:51 2006 @@ -350,7 +350,7 @@ } batchEntry=entry; entry=messageContainer.getNext(entry); - }while(entry!=null&&count<maxReturned); + }while(entry!=null&&count<maxReturned&&listener.hasSpace()); } listener.finished(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Nov 24 11:57:51 2006 @@ -186,7 +186,7 @@ } container.setBatchEntry(entry); entry=container.getListContainer().getNext(entry); - }while(entry!=null&&count<maxReturned); + }while(entry!=null&&count<maxReturned && listener.hasSpace()); } } listener.finished(); Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java Fri Nov 24 11:57:51 2006 @@ -28,7 +28,7 @@ public void testManyProducersManyConsumers() throws Exception { consumerCount = 20; producerCount = 20; - messageCount = 500; + messageCount = 50; messageSize = 1; prefetchCount = 10; Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=diff&rev=478967&r1=478966&r2=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Fri Nov 24 11:57:51 2006 @@ -1,220 +1,56 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.broker.region.cursors; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; import javax.jms.Topic; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; -import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; + /** * @version $Revision: 1.3 $ */ -public class CursorDurableTest extends TestCase{ - - protected static final Log log = LogFactory.getLog(CursorDurableTest.class); - - protected static final int MESSAGE_COUNT=100; - protected static final int PREFETCH_SIZE = 5; - protected BrokerService broker; - protected String bindAddress="tcp://localhost:60706"; - protected int topicCount=0; - - public void testSendFirstThenConsume() throws Exception{ - ConnectionFactory factory=createConnectionFactory(); - Connection consumerConnection= getConsumerConnection(factory); - //create durable subs - MessageConsumer consumer = getConsumer(consumerConnection); - consumerConnection.close(); - - Connection producerConnection = factory.createConnection(); - producerConnection.start(); - Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(getTopic(session)); - List senderList = new ArrayList(); - for (int i =0; i < MESSAGE_COUNT; i++) { - Message msg=session.createTextMessage("test"+i); - senderList.add(msg); - producer.send(msg); - } - producerConnection.close(); - - //now consume the messages - consumerConnection= getConsumerConnection(factory); - //create durable subs - consumer = getConsumer(consumerConnection); - List consumerList = new ArrayList(); - for (int i = 0; i < MESSAGE_COUNT; i++) { - Message msg = consumer.receive(); - consumerList.add(msg); - } - assertEquals(senderList,consumerList); - consumerConnection.close(); - } - - public void testSendWhilstConsume() throws Exception{ - ConnectionFactory factory=createConnectionFactory(); - Connection consumerConnection= getConsumerConnection(factory); - //create durable subs - MessageConsumer consumer = getConsumer(consumerConnection); - consumerConnection.close(); - - Connection producerConnection = factory.createConnection(); - producerConnection.start(); - Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(getTopic(session)); - List senderList = new ArrayList(); - for (int i =0; i < MESSAGE_COUNT/10; i++) { - TextMessage msg=session.createTextMessage("test"+i); - senderList.add(msg); - producer.send(msg); - } - - - //now consume the messages - consumerConnection= getConsumerConnection(factory); - //create durable subs - consumer = getConsumer(consumerConnection); - final List consumerList = new ArrayList(); - - final CountDownLatch latch = new CountDownLatch(1); - consumer.setMessageListener(new MessageListener() { - - public void onMessage(Message msg){ - try{ - //sleep to act as a slow consumer - //which will force a mix of direct and polled dispatching - //using the cursor on the broker - Thread.sleep(50); - }catch(Exception e){ - // TODO Auto-generated catch block - e.printStackTrace(); - } - consumerList.add(msg); - if (consumerList.size()==MESSAGE_COUNT) { - latch.countDown(); - } - - } - - }); - for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) { - TextMessage msg=session.createTextMessage("test"+i); - senderList.add(msg); - - producer.send(msg); - - - } - - - latch.await(300000,TimeUnit.MILLISECONDS); - assertEquals("Still dipatching - count down latch not sprung" , latch.getCount(),0); - assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(),consumerList.size(),senderList.size()); - assertEquals(senderList,consumerList); - producerConnection.close(); - consumerConnection.close(); - } - - +public class CursorDurableTest extends CursorSupport{ - protected Topic getTopic(Session session) throws JMSException{ + protected Destination getDestination(Session session) throws JMSException{ String topicName=getClass().getName(); return session.createTopic(topicName); } - + protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{ Connection connection=fac.createConnection(); connection.setClientID("testConsumer"); connection.start(); return connection; - } - + protected MessageConsumer getConsumer(Connection connection) throws Exception{ - Session consumerSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - Topic topic = getTopic(consumerSession); - MessageConsumer consumer = consumerSession.createDurableSubscriber(topic,"testConsumer"); + Session consumerSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Topic topic=(Topic)getDestination(consumerSession); + MessageConsumer consumer=consumerSession.createDurableSubscriber(topic,"testConsumer"); return consumer; } - - - - protected void setUp() throws Exception{ - if(broker==null){ - broker=createBroker(); - } - super.setUp(); - } - - protected void tearDown() throws Exception{ - super.tearDown(); - - if(broker!=null){ - broker.stop(); - } - } - - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{ - ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress); - Properties props = new Properties(); - props.setProperty("prefetchPolicy.durableTopicPrefetch","" + PREFETCH_SIZE); - props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch","" + PREFETCH_SIZE); - cf.setProperties(props); - return cf; - } - - - protected BrokerService createBroker() throws Exception{ - BrokerService answer=new BrokerService(); - configureBroker(answer); + protected void configureBroker(BrokerService answer) throws Exception{ answer.setDeleteAllMessagesOnStartup(true); answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy()); - answer.start(); - return answer; - } - - protected void configureBroker(BrokerService answer) throws Exception{ - answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true); } Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java?view=auto&rev=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java Fri Nov 24 11:57:51 2006 @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy; + +/** + * @version $Revision: 1.3 $ + */ +public class CursorQueueStoreTest extends CursorSupport{ + + protected Destination getDestination(Session session) throws JMSException{ + String queueName="QUEUE" + getClass().getName(); + return session.createQueue(queueName); + } + + protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{ + Connection connection=fac.createConnection(); + connection.setClientID("testConsumer"); + connection.start(); + return connection; + } + + protected MessageConsumer getConsumer(Connection connection) throws Exception{ + Session consumerSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Destination dest = getDestination(consumerSession); + MessageConsumer consumer=consumerSession.createConsumer(dest); + return consumer; + } + + + protected void configureBroker(BrokerService answer) throws Exception{ + PolicyEntry policy = new PolicyEntry(); + policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy()); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + answer.setDestinationPolicy(pMap); + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?view=auto&rev=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Fri Nov 24 11:57:51 2006 @@ -0,0 +1,175 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision: 1.3 $ + */ +public abstract class CursorSupport extends TestCase{ + + protected static final Log log=LogFactory.getLog(CursorSupport.class); + protected static final int MESSAGE_COUNT=500; + protected static final int PREFETCH_SIZE=50; + protected BrokerService broker; + protected String bindAddress="tcp://localhost:60706"; + + protected abstract Destination getDestination(Session session) throws JMSException; + + protected abstract MessageConsumer getConsumer(Connection connection) throws Exception; + + protected abstract void configureBroker(BrokerService answer) throws Exception; + + public void testSendFirstThenConsume() throws Exception{ + ConnectionFactory factory=createConnectionFactory(); + Connection consumerConnection=getConsumerConnection(factory); + MessageConsumer consumer=getConsumer(consumerConnection); + consumerConnection.close(); + Connection producerConnection=factory.createConnection(); + producerConnection.start(); + Session session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageProducer producer=session.createProducer(getDestination(session)); + List senderList=new ArrayList(); + for(int i=0;i<MESSAGE_COUNT;i++){ + Message msg=session.createTextMessage("test"+i); + senderList.add(msg); + producer.send(msg); + } + producerConnection.close(); + // now consume the messages + consumerConnection=getConsumerConnection(factory); + // create durable subs + consumer=getConsumer(consumerConnection); + List consumerList=new ArrayList(); + for(int i=0;i<MESSAGE_COUNT;i++){ + Message msg=consumer.receive(); + consumerList.add(msg); + } + assertEquals(senderList,consumerList); + consumerConnection.close(); + } + + public void testSendWhilstConsume() throws Exception{ + ConnectionFactory factory=createConnectionFactory(); + Connection consumerConnection=getConsumerConnection(factory); + // create durable subs + MessageConsumer consumer=getConsumer(consumerConnection); + consumerConnection.close(); + Connection producerConnection=factory.createConnection(); + producerConnection.start(); + Session session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageProducer producer=session.createProducer(getDestination(session)); + List senderList=new ArrayList(); + for(int i=0;i<MESSAGE_COUNT/10;i++){ + TextMessage msg=session.createTextMessage("test"+i); + senderList.add(msg); + producer.send(msg); + } + // now consume the messages + consumerConnection=getConsumerConnection(factory); + // create durable subs + consumer=getConsumer(consumerConnection); + final List consumerList=new ArrayList(); + final CountDownLatch latch=new CountDownLatch(1); + consumer.setMessageListener(new MessageListener(){ + + public void onMessage(Message msg){ + try{ + // sleep to act as a slow consumer + // which will force a mix of direct and polled dispatching + // using the cursor on the broker + Thread.sleep(50); + }catch(Exception e){ + // TODO Auto-generated catch block + e.printStackTrace(); + } + consumerList.add(msg); + if(consumerList.size()==MESSAGE_COUNT){ + latch.countDown(); + } + } + }); + for(int i=MESSAGE_COUNT/10;i<MESSAGE_COUNT;i++){ + TextMessage msg=session.createTextMessage("test"+i); + senderList.add(msg); + producer.send(msg); + } + latch.await(300000,TimeUnit.MILLISECONDS); + assertEquals("Still dipatching - count down latch not sprung",latch.getCount(),0); + assertEquals("cosumerList - expected: "+MESSAGE_COUNT+" but was: "+consumerList.size(),consumerList.size(), + senderList.size()); + assertEquals(senderList,consumerList); + producerConnection.close(); + consumerConnection.close(); + } + + protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{ + Connection connection=fac.createConnection(); + connection.setClientID("testConsumer"); + connection.start(); + return connection; + } + + protected void setUp() throws Exception{ + if(broker==null){ + broker=createBroker(); + } + super.setUp(); + } + + protected void tearDown() throws Exception{ + super.tearDown(); + if(broker!=null){ + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{ + ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress); + Properties props=new Properties(); + props.setProperty("prefetchPolicy.durableTopicPrefetch",""+PREFETCH_SIZE); + props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch",""+PREFETCH_SIZE); + props.setProperty("prefetchPolicy.queuePrefetch",""+PREFETCH_SIZE); + cf.setProperties(props); + return cf; + } + + protected BrokerService createBroker() throws Exception{ + BrokerService answer=new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java?view=auto&rev=478967 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java Fri Nov 24 11:57:51 2006 @@ -0,0 +1,48 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.io.File; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +/** + * @version $Revision: 1.3 $ + */ +public class KahaQueueStoreTest extends CursorQueueStoreTest{ + + protected static final Log log = LogFactory.getLog(KahaQueueStoreTest.class); + + + + protected void configureBroker(BrokerService answer) throws Exception{ + KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest")); + answer.setPersistenceAdapter(adaptor); + PolicyEntry policy = new PolicyEntry(); + policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy()); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + answer.setDestinationPolicy(pMap); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java ------------------------------------------------------------------------------ svn:eol-style = native