Author: rajdavies Date: Mon Nov 27 05:40:11 2006 New Revision: 479614 URL: http://svn.apache.org/viewvc?view=rev&rev=479614 Log: support for durable store cursors and retroactive subscribers
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Mon Nov 27 05:40:11 2006 @@ -159,4 +159,29 @@ public boolean isRecoveryRequired(){ return true; } + + public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception{ + boolean result = false; + MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); + try { + msgContext.setDestination(message.getRegionDestination().getActiveMQDestination()); + msgContext.setMessageReference(message); + result = matches(message,msgContext); + if (result) { + doAddRecoveredMessage(message); + } + + }finally { + msgContext.clear(); + } + return result; + } + + public ActiveMQDestination getActiveMQDestination() { + return info != null ? info.getDestination() : null; + } + + protected void doAddRecoveredMessage(MessageReference message) throws Exception { + add(message); + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Nov 27 05:40:11 2006 @@ -152,6 +152,10 @@ super.add(node); } + protected void doAddRecoveredMessage(MessageReference message) throws Exception { + pending.addRecoveredMessage(message); + } + public int getPendingQueueSize() { if( active || keepDurableSubsActive ) { return super.getPendingQueueSize(); @@ -218,5 +222,7 @@ } dispatched.clear(); } + + } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Nov 27 05:40:11 2006 @@ -390,7 +390,7 @@ // Message may have been sitting in the pending list a while // waiting for the consumer to ak the message. - if( node.isExpired() ) { + if( node != QueueMessageReference.NULL_MESSAGE && node.isExpired() ) { continue; // just drop it. } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Mon Nov 27 05:40:11 2006 @@ -34,7 +34,7 @@ /** * @version $Revision: 1.5 $ */ -public interface Subscription { +public interface Subscription extends SubscriptionRecovery { /** * Used to add messages that match the subscription. Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java?view=auto&rev=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java Mon Nov 27 05:40:11 2006 @@ -0,0 +1,49 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * An interface for recoverying transient messages held by the broker + * for retractive recovery for subscribers + * + * @version $Revision$ + */ +public interface SubscriptionRecovery { + + + /** + * Add a message to the SubscriptionRecovery + * @param context + * @param message + * @return true if the message is accepted + * @throws Exception + */ + boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception; + + + /** + * @return the Destination associated with this Subscription + */ + ActiveMQDestination getActiveMQDestination(); + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/SubscriptionRecovery.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Nov 27 05:40:11 2006 @@ -51,6 +51,10 @@ public void addMessageLast(MessageReference node) throws Exception{ } + + public void addRecoveredMessage(MessageReference node) throws Exception{ + addMessageLast(node); + } public void clear(){ } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Mon Nov 27 05:40:11 2006 @@ -17,8 +17,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; @@ -45,7 +44,8 @@ private ListContainer diskList; private Iterator iter=null; private Destination regionDestination; - private ReentrantLock iterLock=new ReentrantLock(); + private AtomicBoolean iterating=new AtomicBoolean(); + private boolean flushRequired; /** * @param name @@ -67,17 +67,19 @@ * reset the cursor * */ - public void reset(){ - try{ - iterLock.lockInterruptibly(); - iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator(); - }catch(InterruptedException e){ - log.warn("Failed to get lock ",e); - } + public synchronized void reset(){ + synchronized(iterating){ + iterating.set(true); + } + iter=isDiskListEmpty()?memoryList.iterator():getDiskList().listIterator(); } - public void release(){ - iterLock.unlock(); + public synchronized void release(){ + iterating.set(false); + if(flushRequired){ + flushRequired=false; + flushToDisk(); + } } public synchronized void destroy(){ @@ -219,13 +221,12 @@ public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){ if(newPercentUsage>=100){ - try{ - if(iterLock.tryLock(500,TimeUnit.MILLISECONDS)){ + synchronized(iterating){ + flushRequired=true; + if(!iterating.get()){ flushToDisk(); - iterLock.unlock(); + flushRequired=false; } - }catch(InterruptedException e){ - log.warn("caught an exception aquiring lock",e); } } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Nov 27 05:40:11 2006 @@ -76,6 +76,13 @@ * @throws Exception */ public void addMessageFirst(MessageReference node) throws Exception; + + /** + * Add a message recovered from a retroactive policy + * @param node + * @throws Exception + */ + public void addRecoveredMessage(MessageReference node) throws Exception; /** * @return true if there pending messages to dispatch Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Nov 27 05:40:11 2006 @@ -147,6 +147,10 @@ } } } + + public void addRecoveredMessage(MessageReference node) throws Exception{ + nonPersistent.addMessageLast(node); + } public void clear(){ pendingCount=0; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006 @@ -22,6 +22,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -53,7 +54,7 @@ return true; } - synchronized public void recover(ConnectionContext context,Topic topic,Subscription sub) throws Exception{ + synchronized public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{ // Re-dispatch the last message seen. int t=tail; // The buffer may not have rolled over yet..., start from the front @@ -63,18 +64,9 @@ if(messages[t]==null) return; // Keep dispatching until t hit's tail again. - MessageEvaluationContext msgContext=context.getMessageEvaluationContext(); do{ MessageReference node=messages[t]; - try{ - msgContext.setDestination(node.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(node); - if(sub.matches(node,msgContext)){ - sub.add(node); - } - }finally{ - msgContext.clear(); - } + sub.addRecoveredMessage(context,node); t++; if(t>=messages.length) t=0; Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006 @@ -17,23 +17,18 @@ */ package org.apache.activemq.broker.region.policy; +import java.util.Iterator; +import java.util.List; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; -import org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy.TimestampWrapper; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; -import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.memory.list.DestinationBasedMessageList; import org.apache.activemq.memory.list.MessageList; import org.apache.activemq.memory.list.SimpleMessageList; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - /** * This implementation of [EMAIL PROTECTED] SubscriptionRecoveryPolicy} will keep a fixed * amount of memory available in RAM for message history which is evicted in @@ -61,22 +56,13 @@ return true; } - public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception { + public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{ // Re-dispatch the messages from the buffer. - List copy = buffer.getMessages(sub); - if( !copy.isEmpty() ) { - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - try { - for (Iterator iter = copy.iterator(); iter.hasNext();) { - MessageReference node = (MessageReference) iter.next(); - msgContext.setDestination(node.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(node); - if (sub.matches(node, msgContext) ) { - sub.add(node); - } - } - } finally { - msgContext.clear(); + List copy=buffer.getMessages(sub.getActiveMQDestination()); + if(!copy.isEmpty()){ + for(Iterator iter=copy.iterator();iter.hasNext();){ + MessageReference node=(MessageReference)iter.next(); + sub.addRecoveredMessage(context,node); } } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006 @@ -19,15 +19,13 @@ import java.util.ArrayList; import java.util.List; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.filter.MessageEvaluationContext; /** * This implementation of [EMAIL PROTECTED] SubscriptionRecoveryPolicy} will only keep @@ -46,20 +44,11 @@ return true; } - public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception { + public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{ // Re-dispatch the last message seen. - MessageReference node = lastImage; - if( node != null ){ - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - try { - msgContext.setDestination(node.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(node); - if (sub.matches(node, msgContext)) { - sub.add(node); - } - } finally { - msgContext.clear(); - } + MessageReference node=lastImage; + if(node!=null){ + sub.addRecoveredMessage(context,node); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006 @@ -20,6 +20,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -43,7 +44,7 @@ return true; } - public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception { + public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception { } public void start() throws Exception { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006 @@ -24,6 +24,7 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -66,20 +67,16 @@ return query.validateUpdate(message.getMessage()); } - public void recover(ConnectionContext context, final Topic topic, final Subscription sub) throws Exception { - if (query != null) { - final MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - try { - ActiveMQDestination destination = sub.getConsumerInfo().getDestination(); - query.execute(destination, new MessageListener() { - public void onMessage(Message message) { - dispatchInitialMessage(message, topic, msgContext, sub); - } - }); - } - finally { - msgContext.clear(); - } + public void recover(final ConnectionContext context,final Topic topic,final SubscriptionRecovery sub) + throws Exception{ + if(query!=null){ + ActiveMQDestination destination=sub.getActiveMQDestination(); + query.execute(destination,new MessageListener(){ + + public void onMessage(Message message){ + dispatchInitialMessage(message,topic,context,sub); + } + }); } } @@ -107,21 +104,17 @@ return new org.apache.activemq.command.Message[0]; } - protected void dispatchInitialMessage(Message message, Destination regionDestination, MessageEvaluationContext msgContext, Subscription sub) { + protected void dispatchInitialMessage(Message message, Destination regionDestination, ConnectionContext context, SubscriptionRecovery sub) { try { ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message, null); ActiveMQDestination destination = activeMessage.getDestination(); if (destination == null) { - destination = sub.getConsumerInfo().getDestination(); + destination = sub.getActiveMQDestination(); activeMessage.setDestination(destination); } activeMessage.setRegionDestination(regionDestination); configure(activeMessage); - msgContext.setDestination(destination); - msgContext.setMessageReference(activeMessage); - if (sub.matches(activeMessage, msgContext)) { - sub.add(activeMessage); - } + sub.addRecoveredMessage(context,activeMessage); } catch (Throwable e) { log.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + e, e); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006 @@ -22,6 +22,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -55,7 +56,7 @@ * @param node * @throws Exception */ - void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception; + void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception; /** @@ -67,6 +68,7 @@ /** * Used to copy the policy object. + * @return the copy */ SubscriptionRecoveryPolicy copy(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java Mon Nov 27 05:40:11 2006 @@ -22,10 +22,9 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -80,25 +79,15 @@ return true; } - public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Exception { - + public void recover(ConnectionContext context,Topic topic,SubscriptionRecovery sub) throws Exception{ // Re-dispatch the messages from the buffer. - ArrayList copy = new ArrayList(buffer); - - if (!copy.isEmpty()) { - MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); - try { - for (Iterator iter = copy.iterator(); iter.hasNext();) { - TimestampWrapper timestampWrapper = (TimestampWrapper) iter.next(); - MessageReference message = timestampWrapper.message; - msgContext.setDestination(message.getRegionDestination().getActiveMQDestination()); - msgContext.setMessageReference(message); - if (sub.matches(message, msgContext)) { - sub.add(timestampWrapper.message); - } - } - }finally { - msgContext.clear(); + ArrayList copy=new ArrayList(buffer); + if(!copy.isEmpty()){ + MessageEvaluationContext msgContext=context.getMessageEvaluationContext(); + for(Iterator iter=copy.iterator();iter.hasNext();){ + TimestampWrapper timestampWrapper=(TimestampWrapper)iter.next(); + MessageReference message=timestampWrapper.message; + sub.addRecoveredMessage(context,message); } } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java Mon Nov 27 05:40:11 2006 @@ -74,7 +74,7 @@ return getMessages(sub.getConsumerInfo().getDestination()); } - protected List getMessages(ActiveMQDestination destination) { + public List getMessages(ActiveMQDestination destination) { Set set = null; synchronized (lock) { set = subscriptionIndex.get(destination); Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java Mon Nov 27 05:40:11 2006 @@ -17,13 +17,11 @@ */ package org.apache.activemq.memory.list; +import java.util.List; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; -import java.util.List; - /** * A container of messages which is used to store messages and then * replay them later for a given subscription. @@ -37,7 +35,7 @@ /** * Returns the current list of MessageReference objects for the given subscription */ - List getMessages(Subscription sub); + List getMessages(ActiveMQDestination destination); /** * @param destination Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java Mon Nov 27 05:40:11 2006 @@ -17,21 +17,18 @@ */ package org.apache.activemq.memory.list; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.network.DemandForwardingBridge; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - /** * A simple fixed size [EMAIL PROTECTED] MessageList} where there is a single, fixed size * list that all messages are added to for simplicity. Though this @@ -66,7 +63,7 @@ } } - public List getMessages(Subscription sub) { + public List getMessages(ActiveMQDestination destination) { return getList(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Mon Nov 27 05:40:11 2006 @@ -112,7 +112,16 @@ if(!subscriberContainer.containsKey(key)){ subscriberContainer.put(key,info); } - addSubscriberMessageContainer(key); + ListContainer container=addSubscriberMessageContainer(key); + if(retroactive){ + for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); + ConsumerMessageRef ref=new ConsumerMessageRef(); + ref.setAckEntry(entry); + ref.setMessageEntry(tsa.getMessageEntry()); + container.add(ref); + } + } } public synchronized void deleteSubscription(String clientId,String subscriptionName){ @@ -207,12 +216,13 @@ return result; } - protected void addSubscriberMessageContainer(Object key) throws IOException{ + protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{ ListContainer container=store.getListContainer(key,"topic-subs"); Marshaller marshaller=new ConsumerMessageRefMarshaller(); container.setMarshaller(marshaller); TopicSubContainer tsc=new TopicSubContainer(container); subscriberMessages.put(key,tsc); + return container; } public int getMessageCount(String clientId,String subscriberName) throws IOException{ Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Mon Nov 27 05:40:11 2006 @@ -19,6 +19,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -37,6 +38,7 @@ protected final ActiveMQDestination destination; protected final Map messageTable; + protected MessageId lastBatchId; public MemoryMessageStore(ActiveMQDestination destination){ this(destination,new LinkedHashMap()); @@ -115,12 +117,32 @@ return messageTable.size(); } - public void resetBatching(MessageId nextToDispatch){ - } - + public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{ + synchronized(messageTable){ + + boolean pastLackBatch=lastBatchId==null; + int count = 0; + for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ + Map.Entry entry=(Entry)iter.next(); + if(pastLackBatch){ + count++; + Object msg=entry.getValue(); + lastBatchId = (MessageId)entry.getKey(); + if(msg.getClass()==String.class){ + listener.recoverMessageReference((String)msg); + }else{ + listener.recoverMessage((Message)msg); + } + }else{ + pastLackBatch=entry.getKey().equals(lastBatchId); + } + } + listener.finished(); + } } public void resetBatching(){ + lastBatchId = null; } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Mon Nov 27 05:40:11 2006 @@ -37,10 +37,11 @@ private Map ackDatabase; private Map subscriberDatabase; + private Map batchDatabase; MessageId lastMessageId; public MemoryTopicMessageStore(ActiveMQDestination destination){ - this(destination,new LinkedHashMap(),makeMap(),makeMap()); + this(destination,new LinkedHashMap(),makeMap(),makeMap(),makeMap()); } protected static Map makeMap(){ @@ -48,10 +49,11 @@ } public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase, - Map ackDatabase){ + Map ackDatabase, Map batchDatabase){ super(destination,messageTable); this.subscriberDatabase=subscriberDatabase; this.ackDatabase=ackDatabase; + this.batchDatabase=batchDatabase; } public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{ @@ -110,13 +112,10 @@ } listener.finished(); } + } - public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned, - MessageRecoveryListener listener) throws Exception{ - listener.finished(); - } - + public void delete(){ super.delete(); ackDatabase.clear(); @@ -128,14 +127,6 @@ return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); } - public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{ - return null; - } - - public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) - throws IOException{ - return null; - } public int getMessageCount(String clientId,String subscriberName) throws IOException{ int result=0; @@ -143,24 +134,56 @@ // the message table is a synchronizedMap - so just have to synchronize here synchronized(messageTable){ result=messageTable.size(); - for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ - Map.Entry entry=(Entry)iter.next(); - if(entry.getKey().equals(lastAck)){ - break; + if(lastAck!=null){ + for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){ + Map.Entry entry=(Entry)iter.next(); + if(entry.getKey().equals(lastAck)){ + break; + } + result--; } - result--; } } return result; } - public void resetBatching(String clientId,String subscriptionName,MessageId id){ - } - + public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, MessageRecoveryListener listener) throws Exception{ + SubscriptionKey key = new SubscriptionKey(clientId,subscriptionName); + MessageId lastBatch = (MessageId)batchDatabase.get(key); + if (lastBatch==null) { + //if last batch null - start from last ack + lastBatch = (MessageId)ackDatabase.get(key); + } + boolean pastLackBatch=lastBatch==null; + MessageId lastId = null; + // the message table is a synchronizedMap - so just have to synchronize here + int count = 0; + synchronized(messageTable){ + for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() &&count < maxReturned ;){ + Map.Entry entry=(Entry)iter.next(); + if(pastLackBatch){ + count++; + Object msg=entry.getValue(); + lastId = (MessageId)entry.getKey(); + if(msg.getClass()==String.class){ + listener.recoverMessageReference((String)msg); + }else{ + listener.recoverMessage((Message)msg); + } + }else{ + pastLackBatch=entry.getKey().equals(lastBatch); + } + } + if (lastId != null) { + batchDatabase.put(key,lastId); + } + listener.finished(); + } } public void resetBatching(String clientId,String subscriptionName){ + batchDatabase.remove(new SubscriptionKey(clientId,subscriptionName)); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=479614&r1=479613&r2=479614 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Mon Nov 27 05:40:11 2006 @@ -120,7 +120,16 @@ if(!subscriberContainer.containsKey(key)){ subscriberContainer.put(key,info); } - addSubscriberMessageContainer(key); + ListContainer container=addSubscriberMessageContainer(key); + if(retroactive){ + for(StoreEntry entry=ackContainer.getFirst();entry!=null;){ + TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); + ConsumerMessageRef ref=new ConsumerMessageRef(); + ref.setAckEntry(entry); + ref.setMessageEntry(tsa.getMessageEntry()); + container.add(ref); + } + } } public synchronized void deleteSubscription(String clientId,String subscriptionName){ @@ -204,12 +213,13 @@ return result; } - protected void addSubscriberMessageContainer(Object key) throws IOException{ + protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{ ListContainer container=store.getListContainer(key,"topic-subs"); Marshaller marshaller=new ConsumerMessageRefMarshaller(); container.setMarshaller(marshaller); TopicSubContainer tsc=new TopicSubContainer(container); subscriberMessages.put(key,tsc); + return container; } public int getMessageCount(String clientId,String subscriberName) throws IOException{