Author: rajdavies Date: Fri Dec 29 12:21:10 2006 New Revision: 491089 URL: http://svn.apache.org/viewvc?view=rev&rev=491089 Log: clear last batch id if no more messages left to dispatch to a durable subscriber
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=491089&r1=491088&r2=491089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Dec 29 12:21:10 2006 @@ -21,7 +21,6 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.kaha.ListContainer; @@ -29,10 +28,8 @@ import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; -import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.rapid.RapidMessageReference; /** * @version $Revision: 1.5 $ @@ -70,7 +67,7 @@ ConsumerMessageRef ref=new ConsumerMessageRef(); ref.setAckEntry(ackEntry); ref.setMessageEntry(messageEntry); - container.getListContainer().add(ref); + container.add(ref); } } } @@ -80,7 +77,10 @@ String subcriberId=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); if(container!=null){ - ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst(); + ConsumerMessageRef ref=container.remove(); + if(container.isEmpty()){ + container.reset(); + } if(ref!=null){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){ @@ -112,7 +112,7 @@ if(!subscriberContainer.containsKey(key)){ subscriberContainer.put(key,info); } - //add the subscriber + // add the subscriber ListContainer container=addSubscriberMessageContainer(key); if(retroactive){ for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ @@ -135,7 +135,7 @@ String key=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); if(container!=null){ - for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + for(Iterator i=container.iterator();i.hasNext();){ ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); Object msg=messageContainer.get(ref.getMessageEntry()); if(msg!=null){ @@ -158,14 +158,16 @@ int count=0; StoreEntry entry=container.getBatchEntry(); if(entry==null){ - entry=container.getListContainer().getFirst(); + entry=container.getEntry(); }else{ - entry=container.getListContainer().refresh(entry); - entry=container.getListContainer().getNext(entry); + entry=container.refreshEntry(entry); + if(entry!=null){ + entry=container.getNextEntry(entry); + } } if(entry!=null){ do{ - ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry); + ConsumerMessageRef consumerRef=container.get(entry); Object msg=messageContainer.get(consumerRef.getMessageEntry()); if(msg!=null){ if(msg.getClass()==String.class){ @@ -178,7 +180,7 @@ count++; } container.setBatchEntry(entry); - entry=container.getListContainer().getNext(entry); + entry=container.getNextEntry(entry); }while(entry!=null&&count<maxReturned&&listener.hasSpace()); } } @@ -210,11 +212,11 @@ subscriberMessages.put(key,tsc); return container; } - - protected void removeSubscriberMessageContainer(Object key) throws IOException { + + protected void removeSubscriberMessageContainer(Object key) throws IOException{ subscriberContainer.remove(key); TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key); - for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + for(Iterator i=container.iterator();i.hasNext();){ ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); if(ref!=null){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); @@ -234,7 +236,7 @@ public int getMessageCount(String clientId,String subscriberName) throws IOException{ String key=getSubscriptionKey(clientId,subscriberName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); - return container.getListContainer().size(); + return container.size(); } /** @@ -251,8 +253,6 @@ messageContainer.add(messageRef); } - - /** * @param identity * @return String @@ -263,7 +263,6 @@ return null; } - /** * @param context * @throws IOException @@ -274,11 +273,10 @@ ackContainer.clear(); for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){ TopicSubContainer container=(TopicSubContainer)i.next(); - container.getListContainer().clear(); + container.clear(); } } - public synchronized void resetBatching(String clientId,String subscriptionName){ String key=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=491089&r1=491088&r2=491089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Fri Dec 29 12:21:10 2006 @@ -14,6 +14,7 @@ package org.apache.activemq.store.kahadaptor; +import java.util.Iterator; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.StoreEntry; @@ -44,22 +45,54 @@ this.batchEntry=batchEntry; } - /** - * @return the listContainer - */ - public ListContainer getListContainer(){ - return this.listContainer; + + public void reset() { + batchEntry = null; + } + + public boolean isEmpty() { + return listContainer.isEmpty(); } - /** - * @param listContainer the listContainer to set - */ - public void setListContainer(ListContainer container){ - this.listContainer=container; + public void add(ConsumerMessageRef ref) { + listContainer.add(ref); } - public void reset() { - batchEntry = null; + public ConsumerMessageRef remove() { + ConsumerMessageRef result = (ConsumerMessageRef)listContainer.removeFirst(); + if (listContainer.isEmpty()) { + reset(); + } + return result; + } + + public ConsumerMessageRef get(StoreEntry entry) { + return (ConsumerMessageRef)listContainer.get(entry); + } + + public StoreEntry getEntry() { + return listContainer.getFirst(); + } + + public StoreEntry refreshEntry(StoreEntry entry) { + return listContainer.refresh(entry); + } + + public StoreEntry getNextEntry(StoreEntry entry) { + return listContainer.getNext(entry); + } + + public Iterator iterator() { + return listContainer.iterator(); + } + + public int size() { + return listContainer.size(); + } + + public void clear() { + reset(); + listContainer.clear(); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java?view=diff&rev=491089&r1=491088&r2=491089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java Fri Dec 29 12:21:10 2006 @@ -38,6 +38,9 @@ void removeMessage(MessageId id){ map.remove(id); + if (map.isEmpty()) { + lastBatch=null; + } } int size(){ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=491089&r1=491088&r2=491089 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Dec 29 12:21:10 2006 @@ -22,7 +22,6 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.kaha.ListContainer; @@ -78,7 +77,7 @@ ConsumerMessageRef ref=new ConsumerMessageRef(); ref.setAckEntry(ackEntry); ref.setMessageEntry(messageEntry); - container.getListContainer().add(ref); + container.add(ref); } } } @@ -88,7 +87,7 @@ String subcriberId=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); if(container!=null){ - ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst(); + ConsumerMessageRef ref=(ConsumerMessageRef)container.remove(); if(ref!=null){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){ @@ -142,7 +141,7 @@ String key=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); if(container!=null){ - for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + for(Iterator i=container.iterator();i.hasNext();){ ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref .getMessageEntry()); @@ -163,14 +162,14 @@ int count=0; StoreEntry entry=container.getBatchEntry(); if(entry==null){ - entry=container.getListContainer().getFirst(); + entry=container.getEntry(); }else{ - entry=container.getListContainer().refresh(entry); - entry=container.getListContainer().getNext(entry); + entry=container.refreshEntry(entry); + entry=container.getNextEntry(entry); } if(entry!=null){ do{ - ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry); + ConsumerMessageRef consumerRef=container.get(entry); RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef .getMessageEntry()); if(messageReference!=null){ @@ -179,7 +178,7 @@ count++; } container.setBatchEntry(entry); - entry=container.getListContainer().getNext(entry); + entry=container.getNextEntry(entry); }while(entry!=null&&count<maxReturned && listener.hasSpace()); } } @@ -210,7 +209,7 @@ protected void removeSubscriberMessageContainer(Object key) throws IOException { subscriberContainer.remove(key); TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key); - for(Iterator i=container.getListContainer().iterator();i.hasNext();){ + for(Iterator i=container.iterator();i.hasNext();){ ConsumerMessageRef ref=(ConsumerMessageRef)i.next(); if(ref!=null){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); @@ -230,7 +229,7 @@ public int getMessageCount(String clientId,String subscriberName) throws IOException{ String key=getSubscriptionKey(clientId,subscriberName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key); - return container.getListContainer().size(); + return container.size(); } /** @@ -271,7 +270,7 @@ ackContainer.clear(); for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){ TopicSubContainer container=(TopicSubContainer)i.next(); - container.getListContainer().clear(); + container.clear(); } } @@ -294,7 +293,7 @@ String subcriberId=getSubscriptionKey(clientId,subscriptionName); TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId); if(container!=null){ - ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst(); + ConsumerMessageRef ref=(ConsumerMessageRef)container.remove(); if(ref!=null){ TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){