Author: rajdavies Date: Thu Dec 28 13:03:53 2006 New Revision: 490814 URL: http://svn.apache.org/viewvc?view=rev&rev=490814 Log: Use the store based cursor by default for Queues - which will enable very large queue support
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=490814&r1=490813&r2=490814 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Dec 28 13:03:53 2006 @@ -38,6 +38,7 @@ import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import sun.security.x509.IssuerAlternativeNameExtension; import java.util.concurrent.ConcurrentHashMap; @@ -60,6 +61,7 @@ protected final TaskRunnerFactory taskRunnerFactory; protected final Object destinationsMutex = new Object(); protected final Map consumerChangeMutexMap = new HashMap(); + protected boolean started = false; public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { if (broker == null) { @@ -76,9 +78,15 @@ } public void start() throws Exception { + started = true; + for (Iterator i = destinations.values().iterator();i.hasNext();) { + Destination dest = (Destination)i.next(); + dest.start(); + } } public void stop() throws Exception { + started = false; for (Iterator i = destinations.values().iterator();i.hasNext();) { Destination dest = (Destination)i.next(); dest.stop(); @@ -102,7 +110,7 @@ if (destinationInterceptor != null) { dest = destinationInterceptor.intercept(dest); } - + dest.start(); destinations.put(destination, dest); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=490814&r1=490813&r2=490814 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Thu Dec 28 13:03:53 2006 @@ -77,7 +77,7 @@ if (destination.isQueue()) { if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; - return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) { + return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) { public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { // Only consumers on the same connection can consume from @@ -90,7 +90,7 @@ }; } else { MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination); - Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory); + Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()); configureQueue(queue, destination); queue.initialize(); return queue; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=490814&r1=490813&r2=490814 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Dec 28 13:03:53 2006 @@ -19,22 +19,21 @@ import java.io.IOException; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.util.SubscriptionKey; -import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class DurableTopicSubscription extends PrefetchSubscription { - + static private final Log log=LogFactory.getLog(PrefetchSubscription.class); private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap(); private final ConcurrentHashMap destinations = new ConcurrentHashMap(); private final SubscriptionKey subscriptionKey; @@ -72,6 +71,7 @@ } public void activate(ConnectionContext context, ConsumerInfo info) throws Exception { + log.debug("Deactivating " + this); if( !active ) { this.active = true; this.context = context; @@ -96,7 +96,8 @@ } } - synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { + synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { + active=false; synchronized(pending){ pending.stop(); @@ -197,9 +198,12 @@ "DurableTopicSubscription:" + " consumer="+info.getConsumerId()+ ", destinations="+destinations.size()+ - ", dispatched="+dispatched.size()+ - ", delivered="+this.prefetchExtension+ - ", pending="+getPendingQueueSize(); + ", total="+enqueueCounter+ + ", pending="+getPendingQueueSize()+ + ", dispatched="+dispatchCounter+ + ", inflight="+dispatched.size()+ + ", prefetchExtension="+this.prefetchExtension; + } public String getClientId() { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=490814&r1=490813&r2=490814 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Dec 28 13:03:53 2006 @@ -327,6 +327,10 @@ return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9); } + public int countBeforeFull() { + return info.getPrefetchSize() + prefetchExtension - dispatched.size(); + } + public int getPendingQueueSize(){ synchronized(pending) { return pending.size(); @@ -396,28 +400,38 @@ List toDispatch=null; synchronized(pending){ try{ - pending.reset(); - while(pending.hasNext()&&!isFull()){ - MessageReference node=pending.next(); - pending.remove(); - // Message may have been sitting in the pending list a while - // waiting for the consumer to ak the message. - if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ - continue; // just drop it. + int numberToDispatch=countBeforeFull(); + if(numberToDispatch>0){ + int count=0; + pending.reset(); + while(pending.hasNext()&&!isFull()&&count<numberToDispatch){ + MessageReference node=pending.next(); + + if(canDispatch(node)){ + pending.remove(); + // Message may have been sitting in the pending list a while + // waiting for the consumer to ak the message. + if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ + continue; // just drop it. + } + if(toDispatch==null){ + toDispatch=new ArrayList(); + } + toDispatch.add(node); + count++; + } } - if(toDispatch==null){ - toDispatch=new ArrayList(); - } - toDispatch.add(node); } }finally{ pending.release(); } } if(toDispatch!=null){ - for(int i=0;i<toDispatch.size();i++){ - MessageReference node=(MessageReference)toDispatch.get(i); - dispatch(node); + synchronized(dispatched){ + for(int i=0;i<toDispatch.size();i++){ + MessageReference node=(MessageReference)toDispatch.get(i); + dispatch(node); + } } } }finally{ @@ -458,6 +472,7 @@ } return true; }else{ + QueueMessageReference n = (QueueMessageReference) node; return false; } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=490814&r1=490813&r2=490814 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Dec 28 13:03:53 2006 @@ -28,6 +28,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.StoreQueueCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMap; @@ -44,6 +45,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.kaha.Store; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageRecoveryListener; @@ -74,7 +76,7 @@ private final Valve dispatchValve = new Valve(true); private final UsageManager usageManager; private final DestinationStatistics destinationStatistics = new DestinationStatistics(); - private PendingMessageCursor messages = new VMPendingMessageCursor(); + private PendingMessageCursor messages; private final LinkedList pagedInMessages = new LinkedList(); private LockOwner exclusiveOwner; @@ -92,13 +94,20 @@ private final Object exclusiveLockMutex = new Object(); private final Object doDispatchMutex = new Object(); private TaskRunner taskRunner; + private boolean started = false; public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, - TaskRunnerFactory taskFactory) throws Exception { + TaskRunnerFactory taskFactory, Store tmpStore) throws Exception { this.destination = destination; this.usageManager = new UsageManager(memoryManager); this.usageManager.setLimit(Long.MAX_VALUE); this.store = store; + if(destination.isTemporary()){ + this.messages=new VMPendingMessageCursor(); + }else{ + this.messages=new StoreQueueCursor(this,tmpStore); + } + this.taskRunner = taskFactory.createTaskRunner(this, "Queue "+destination.getPhysicalName()); // Let the store know what usage manager we are using so that he can @@ -118,18 +127,16 @@ if(store!=null){ // Restore the persistent messages. messages.setUsageManager(getUsageManager()); - messages.start(); if(messages.isRecoveryRequired()){ store.recover(new MessageRecoveryListener(){ public void recoverMessage(Message message){ - // Message could have expired while it was being loaded.. - if( message.isExpired() ) { - // TODO: remove message from store. - return; - } - - message.setRegionDestination(Queue.this); + // Message could have expired while it was being loaded.. + if(message.isExpired()){ + // TODO remove from store + return; + } + message.setRegionDestination(Queue.this); synchronized(messages){ try{ messages.addMessageLast(message); @@ -157,10 +164,12 @@ /** * Lock a node + * * @param node * @param lockOwner * @return true if can be locked - * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.broker.region.LockOwner) + * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference, + * org.apache.activemq.broker.region.LockOwner) */ public boolean lock(MessageReference node,LockOwner lockOwner){ synchronized(exclusiveLockMutex){ @@ -309,46 +318,60 @@ } public void send(final ConnectionContext context,final Message message) throws Exception{ - // There is delay between the client sending it and it arriving at the - // destination.. it may have expired. - if( message.isExpired() ) { - return; - } - + // There is delay between the client sending it and it arriving at the + // destination.. it may have expired. + if(message.isExpired()){ + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + return; + } if(context.isProducerFlowControl()){ if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){ throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached"); }else{ usageManager.waitForSpace(); - // The usage manager could have delayed us by the time // we unblock the message could have expired.. - if( message.isExpired() ) { - return; - } + if(message.isExpired()){ + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + return; + } } } message.setRegionDestination(this); - if (store != null && message.isPersistent()) { - store.addMessage(context, message); + if(store!=null&&message.isPersistent()){ + store.addMessage(context,message); } if(context.isInTransaction()){ context.getTransaction().addSynchronization(new Synchronization(){ public void afterCommit() throws Exception{ - - // It could take while before we receive the commit - // operration.. by that time the message could have expired.. - if( message.isExpired() ) { - // TODO: remove message from store. - return; - } - + //even though the message could be expired - it won't be from the store + //and it's important to keep the store/cursor in step + synchronized(messages){ + messages.addMessageLast(message); + } + // It could take while before we receive the commit + // operration.. by that time the message could have expired.. + if(message.isExpired()){ + // TODO: remove message from store. + if (log.isDebugEnabled()) { + log.debug("Expired message: " + message); + } + return; + } sendMessage(context,message); } }); }else{ + synchronized(messages){ + messages.addMessageLast(message); + } sendMessage(context,message); + } } @@ -432,12 +455,19 @@ } public void start() throws Exception { + started = true; + messages.start(); + doPageIn(false); } public void stop() throws Exception { + started = false; if( taskRunner!=null ) { taskRunner.shutdown(); } + if(messages!=null){ + messages.stop(); + } } // Properties @@ -528,6 +558,11 @@ public Message[] browse() { ArrayList l = new ArrayList(); + try{ + doPageIn(true); + }catch(Exception e){ + log.error("caught an exception browsing " + this,e); + } synchronized(pagedInMessages) { for (Iterator i = pagedInMessages.iterator();i.hasNext();) { MessageReference r = (MessageReference)i.next(); @@ -538,7 +573,7 @@ l.add(m); } }catch(IOException e){ - log.error("caught an exception brwsing " + this,e); + log.error("caught an exception browsing " + this,e); } finally { r.decrementReferenceCount(); @@ -850,11 +885,10 @@ return answer; } + private void sendMessage(final ConnectionContext context,Message msg) throws Exception{ - synchronized(messages){ - messages.addMessageLast(msg); - } + destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); pageInMessages(false); @@ -863,10 +897,11 @@ private List doPageIn() throws Exception{ return doPageIn(true); } + private List doPageIn(boolean force) throws Exception{ final int toPageIn=maximumPagedInMessages-pagedInMessages.size(); List result=null; - if((force || !consumers.isEmpty())&&toPageIn>0){ + if((force||!consumers.isEmpty())&&toPageIn>0){ try{ dispatchValve.increment(); int count=0; @@ -877,9 +912,15 @@ while(messages.hasNext()&&count<toPageIn){ MessageReference node=messages.next(); messages.remove(); - node=createMessageReference(node.getMessage()); - result.add(node); - count++; + if(!node.isExpired()){ + node=createMessageReference(node.getMessage()); + result.add(node); + count++; + }else{ + if (log.isDebugEnabled()) { + log.debug("Expired message: " + node); + } + } } }finally{ messages.release(); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=490814&r1=490813&r2=490814 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Thu Dec 28 13:03:53 2006 @@ -73,7 +73,7 @@ protected boolean canDispatch(MessageReference n) throws IOException { QueueMessageReference node = (QueueMessageReference) n; - if( node.isAcked() ) + if( node.isAcked()) return false; // Keep message groups together. String groupId = node.getGroupID(); @@ -208,7 +208,7 @@ /** */ - synchronized public void destroy() { + public void destroy() { } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=490814&r1=490813&r2=490814 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Dec 28 13:03:53 2006 @@ -78,7 +78,7 @@ private final Region tempQueueRegion; private final Region tempTopicRegion; private BrokerService brokerService; - private boolean stopped = false; + private boolean started = false; private boolean keepDurableSubsActive=false; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); @@ -178,6 +178,7 @@ public void start() throws Exception { ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive); + started = true; queueRegion.start(); topicRegion.start(); tempQueueRegion.start(); @@ -185,7 +186,7 @@ } public void stop() throws Exception { - stopped = true; + started = false; ServiceStopper ss = new ServiceStopper(); doStop(ss); ss.throwFirstException(); @@ -245,7 +246,6 @@ if( destinations.contains(destination) ){ throw new JMSException("Destination already exists: "+destination); } - Destination answer = null; switch(destination.getDestinationType()) { case ActiveMQDestination.QUEUE_TYPE: @@ -366,7 +366,8 @@ } public void send(ConnectionContext context, Message message) throws Exception { - message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId()); + long si = sequenceGenerator.getNextSequenceId(); + message.getMessageId().setBrokerSequenceId(si); if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { //timestamp not been disabled and has not passed through a network message.setTimestamp(System.currentTimeMillis()); @@ -541,7 +542,7 @@ } public boolean isStopped(){ - return stopped; + return !started; } public Set getDurableDestinations(){