Author: rajdavies
Date: Fri Jul 20 10:08:10 2007
New Revision: 558054

URL: http://svn.apache.org/viewvc?view=rev&rev=558054
Log:
Fix for:
 http://issues.apache.org/activemq/browse/AMQ-1207
http://issues.apache.org/activemq/browse/AMQ-880
http://issues.apache.org/activemq/browse/AMQ-450
http://issues.apache.org/activemq/browse/AMQ-879

Added:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 Fri Jul 20 10:08:10 2007
@@ -17,6 +17,7 @@
  */
 package org.apache.activemq.advisory;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.activemq.broker.Broker;
@@ -24,6 +25,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -38,6 +40,8 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -49,7 +53,7 @@
  */
 public class AdvisoryBroker extends BrokerFilter {
     
-    //private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
+    private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
     
     protected final ConcurrentHashMap connections = new ConcurrentHashMap();
     protected final ConcurrentHashMap consumers = new ConcurrentHashMap();
@@ -226,6 +230,16 @@
             ActiveMQTopic topic = 
AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
             producers.remove(info.getProducerId());
             fireProducerAdvisory(context, topic, info.createRemoveCommand());
+        }
+    }
+    
+    public void messageExpired(ConnectionContext context,MessageReference 
messageReference){
+        next.messageExpired(context,messageReference);
+        try{
+            ActiveMQTopic 
topic=AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
+            fireAdvisory(context,topic,messageReference.getMessage());
+        }catch(Exception e){
+            log.warn("Failed to fire message expired advisory");
         }
     }
     

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
 Fri Jul 20 10:08:10 2007
@@ -64,6 +64,13 @@
             return new 
ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX+destination.getPhysicalName());
     }
     
+    public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination 
destination) {
+        if (destination.isQueue()) {
+            return getExpiredQueueMessageAdvisoryTopic(destination);
+        }
+        return getExpiredTopicMessageAdvisoryTopic(destination);
+    }
+    
     public static ActiveMQTopic 
getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
         String name = 
EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName();
         return new ActiveMQTopic(name);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
 Fri Jul 20 10:08:10 2007
@@ -20,19 +20,15 @@
 import java.net.URI;
 import java.util.Set;
 import org.apache.activemq.Service;
-import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Region;
-import 
org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
@@ -135,6 +131,8 @@
     
     /**
      * Gets a list of all the prepared xa transactions.
+     * @param context transaction ids
+     * @return 
      * @throws Exception TODO
      */
     public TransactionId[] getPreparedTransactions(ConnectionContext context) 
throws Exception;
@@ -151,7 +149,7 @@
      * Prepares a transaction. Only valid for xa transactions.
      * @param context
      * @param xid
-     * @return
+     * @return id
      * @throws Exception TODO
      */
     public int prepareTransaction(ConnectionContext context, TransactionId 
xid) throws Exception;
@@ -176,6 +174,9 @@
 
     /**
      * Forgets a transaction.
+     * @param context 
+     * @param transactionId 
+     * @throws Exception 
      */
     public void forgetTransaction(ConnectionContext context, TransactionId 
transactionId) throws Exception;
     
@@ -246,7 +247,35 @@
      */
     public URI getVmConnectorURI();
     
+    /**
+     * called when the brokerService starts
+     */
     public void brokerServiceStarted();
     
+    /**
+     * @return the BrokerService
+     */
     BrokerService getBrokerService();
+    
+    /**
+     * Ensure we get the Broker at the top of the Stack
+     * @return the broker at the top of the Stack
+     */
+    Broker getRoot();
+    
+    /**
+     * A Message has Expired
+     * @param context
+     * @param messageReference
+     * @throws Exception 
+     */
+    public void messageExpired(ConnectionContext context, MessageReference 
messageReference);
+    
+    /**
+     * A message needs to go the a DLQ
+     * @param context
+     * @param messageReference
+     * @throws Exception
+     */
+    public void sendToDeadLetterQueue(ConnectionContext 
context,MessageReference messageReference);
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
 Fri Jul 20 10:08:10 2007
@@ -17,9 +17,12 @@
  */
 package org.apache.activemq.broker;
 
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
-import 
org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -38,10 +41,6 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
-import java.net.URI;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Allows you to intercept broker operation so that features such as security 
can be 
  * implemented as a pluggable filter.
@@ -245,5 +244,17 @@
     
     public BrokerService getBrokerService(){
         return next.getBrokerService();
+    }
+
+    public void messageExpired(ConnectionContext context,MessageReference 
message){
+        next.messageExpired(context,message); 
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext 
context,MessageReference messageReference){
+       next.sendToDeadLetterQueue(context,messageReference);       
+    }
+
+    public Broker getRoot() {
+       return next.getRoot();
     }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
 Fri Jul 20 10:08:10 2007
@@ -17,9 +17,13 @@
  */
 package org.apache.activemq.broker;
 
+import java.net.URI;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
-import 
org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -38,11 +42,6 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
-import java.net.URI;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Dumb implementation - used to be overriden by listeners
  * 
@@ -245,4 +244,14 @@
     public BrokerService getBrokerService(){
         return null;
     }
+
+    public void messageExpired(ConnectionContext context,MessageReference 
message){        
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext 
context,MessageReference messageReference){        
+    }
+    
+    public Broker getRoot(){
+        return null;
+     }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
 Fri Jul 20 10:08:10 2007
@@ -21,10 +21,9 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
-import 
org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -245,4 +244,16 @@
     public BrokerService getBrokerService(){
         throw new BrokerStoppedException(this.message);
     }
+
+    public void messageExpired(ConnectionContext context,MessageReference 
message){
+       throw new BrokerStoppedException(this.message);        
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext 
context,MessageReference messageReference){
+       throw new BrokerStoppedException(this.message); 
+    }
+    
+    public Broker getRoot(){
+        throw new BrokerStoppedException(this.message);
+     }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
 Fri Jul 20 10:08:10 2007
@@ -17,9 +17,12 @@
  */
 package org.apache.activemq.broker;
 
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
-import 
org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -38,10 +41,6 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
-import java.net.URI;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Like a BrokerFilter but it allows you to switch the getNext().broker.  This 
has more 
  * overhead than a BrokerFilter since access to the getNext().broker has to 
synchronized
@@ -258,6 +257,19 @@
     
     public BrokerService getBrokerService(){
         return getNext().getBrokerService();
+    }
+
+   
+    public void messageExpired(ConnectionContext context,MessageReference 
message){
+        getNext().messageExpired(context,message);        
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext 
context,MessageReference messageReference) {
+       getNext().sendToDeadLetterQueue(context,messageReference);
+    }
+    
+    public Broker getRoot(){
+        return getNext().getRoot();
     }
 
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 Fri Jul 20 10:08:10 2007
@@ -332,14 +332,15 @@
                     // Try to auto create the destination... re-invoke broker 
from the
                     // top so that the proper security checks are performed.
                     try {
+                        
+                        
context.getBroker().addDestination(context,destination);
                         dest = addDestination(context, destination);
-                        
//context.getBroker().addDestination(context,destination);
                     }
                     catch (DestinationAlreadyExistsException e) {
                         // if the destination already exists then lets ignore 
this error
                     }
                     // We should now have the dest created.
-                    //dest=(Destination) destinations.get(destination);
+                    dest=(Destination) destinations.get(destination);
                 }
                 if(dest==null){
                     throw new JMSException("The destination "+destination+" 
does not exist.");

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
 Fri Jul 20 10:08:10 2007
@@ -42,118 +42,121 @@
  * @author [EMAIL PROTECTED]
  * @version $Revision$
  */
-public class DestinationFactoryImpl extends DestinationFactory {
+public class DestinationFactoryImpl extends DestinationFactory{
 
     protected final UsageManager memoryManager;
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final PersistenceAdapter persistenceAdapter;
     protected RegionBroker broker;
 
-    public DestinationFactoryImpl(UsageManager memoryManager, 
TaskRunnerFactory taskRunnerFactory,
-            PersistenceAdapter persistenceAdapter) {
-        this.memoryManager = memoryManager;
-        this.taskRunnerFactory = taskRunnerFactory;
-        if (persistenceAdapter == null) {
+    public DestinationFactoryImpl(UsageManager memoryManager,TaskRunnerFactory 
taskRunnerFactory,
+            PersistenceAdapter persistenceAdapter){
+        this.memoryManager=memoryManager;
+        this.taskRunnerFactory=taskRunnerFactory;
+        if(persistenceAdapter==null){
             throw new IllegalArgumentException("null persistenceAdapter");
         }
-        this.persistenceAdapter = persistenceAdapter;
+        this.persistenceAdapter=persistenceAdapter;
     }
 
-    public void setRegionBroker(RegionBroker broker) {
-        if (broker == null) {
+    public void setRegionBroker(RegionBroker broker){
+        if(broker==null){
             throw new IllegalArgumentException("null broker");
         }
-        this.broker = broker;
+        this.broker=broker;
     }
 
-    public Set getDestinations() {
+    public Set getDestinations(){
         return persistenceAdapter.getDestinations();
     }
 
     /**
      * @return instance of [EMAIL PROTECTED] Queue} or [EMAIL PROTECTED] Topic}
      */
-    public Destination createDestination(ConnectionContext context, 
ActiveMQDestination destination, DestinationStatistics destinationStatistics) 
throws Exception {
-        if (destination.isQueue()) {
-            if (destination.isTemporary()) {
-                final ActiveMQTempDestination tempDest = 
(ActiveMQTempDestination) destination;
-                return new Queue(destination, memoryManager, null, 
destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
-                   
-                    public void addSubscription(ConnectionContext 
context,Subscription sub) throws Exception {
+    public Destination createDestination(ConnectionContext 
context,ActiveMQDestination destination,
+            DestinationStatistics destinationStatistics) throws Exception{
+        if(destination.isQueue()){
+            if(destination.isTemporary()){
+                final ActiveMQTempDestination 
tempDest=(ActiveMQTempDestination)destination;
+                return new 
Queue(broker.getRoot(),destination,memoryManager,null,
+                        
destinationStatistics,taskRunnerFactory,broker.getTempDataStore()){
+
+                    public void addSubscription(ConnectionContext 
context,Subscription sub) throws Exception{
                         // Only consumers on the same connection can consume 
from 
                         // the temporary destination
-                        if( !tempDest.getConnectionId().equals( 
sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+                        
if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){
                             throw new JMSException("Cannot subscribe to remote 
temporary destination: "+tempDest);
                         }
-                        super.addSubscription(context, sub);
+                        super.addSubscription(context,sub);
                     };
                 };
-            } else {
-                MessageStore store = 
persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
-                Queue queue = new Queue(destination, memoryManager, store, 
destinationStatistics, taskRunnerFactory,broker.getTempDataStore());
-                configureQueue(queue, destination);
+            }else{
+                MessageStore 
store=persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
+                Queue queue=new 
Queue(broker.getRoot(),destination,memoryManager,store,
+                        
destinationStatistics,taskRunnerFactory,broker.getTempDataStore());
+                configureQueue(queue,destination);
                 queue.initialize();
                 return queue;
             }
-        } else if (destination.isTemporary()){
-            final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) 
destination;
-            return new Topic(destination, null, memoryManager, 
destinationStatistics, taskRunnerFactory) {
-                public void addSubscription(ConnectionContext 
context,Subscription sub) throws Exception {
+        }else if(destination.isTemporary()){
+            final ActiveMQTempDestination 
tempDest=(ActiveMQTempDestination)destination;
+            return new Topic(broker.getRoot(),destination,null,memoryManager,
+                    destinationStatistics,taskRunnerFactory){
+
+                public void addSubscription(ConnectionContext 
context,Subscription sub) throws Exception{
                     // Only consumers on the same connection can consume from 
                     // the temporary destination
-                    if( !tempDest.getConnectionId().equals( 
sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+                    
if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){
                         throw new JMSException("Cannot subscribe to remote 
temporary destination: "+tempDest);
                     }
-                    super.addSubscription(context, sub);
+                    super.addSubscription(context,sub);
                 };
             };
-        } else {
-            TopicMessageStore store = null;
-            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
-                store = 
persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
+        }else{
+            TopicMessageStore store=null;
+            if(!AdvisorySupport.isAdvisoryTopic(destination)){
+                
store=persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
             }
-            
-            Topic topic = new Topic(destination, store, memoryManager, 
destinationStatistics, taskRunnerFactory);
-            configureTopic(topic, destination);
-            
+            Topic topic=new 
Topic(broker.getRoot(),destination,store,memoryManager,
+                    destinationStatistics,taskRunnerFactory);
+            configureTopic(topic,destination);
             return topic;
         }
     }
 
-    protected void configureQueue(Queue queue, ActiveMQDestination 
destination) {
-        if (broker == null) {
+    protected void configureQueue(Queue queue,ActiveMQDestination destination){
+        if(broker==null){
             throw new IllegalStateException("broker property is not set");
         }
-        if (broker.getDestinationPolicy() != null) {
-            PolicyEntry entry = 
broker.getDestinationPolicy().getEntryFor(destination);
-            if (entry != null) {
+        if(broker.getDestinationPolicy()!=null){
+            PolicyEntry 
entry=broker.getDestinationPolicy().getEntryFor(destination);
+            if(entry!=null){
                 entry.configure(queue,broker.getTempDataStore());
             }
         }
     }
 
-    protected void configureTopic(Topic topic, ActiveMQDestination 
destination) {
-        if (broker == null) {
+    protected void configureTopic(Topic topic,ActiveMQDestination destination){
+        if(broker==null){
             throw new IllegalStateException("broker property is not set");
         }
-        if (broker.getDestinationPolicy() != null) {
-            PolicyEntry entry = 
broker.getDestinationPolicy().getEntryFor(destination);
-            if (entry != null) {
+        if(broker.getDestinationPolicy()!=null){
+            PolicyEntry 
entry=broker.getDestinationPolicy().getEntryFor(destination);
+            if(entry!=null){
                 entry.configure(topic);
             }
         }
     }
 
-    public long getLastMessageBrokerSequenceId() throws IOException {
+    public long getLastMessageBrokerSequenceId() throws IOException{
         return persistenceAdapter.getLastMessageBrokerSequenceId();
     }
 
-    public PersistenceAdapter getPersistenceAdapter() {
+    public PersistenceAdapter getPersistenceAdapter(){
         return persistenceAdapter;
     }
 
-    public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) 
throws IOException {
+    public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) 
throws IOException{
         return 
persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
     }
-
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Fri Jul 20 10:08:10 2007
@@ -276,17 +276,7 @@
      * @throws Exception
      */
     protected void sendToDLQ(final ConnectionContext context,final 
MessageReference node) throws IOException,Exception{
-        // Send the message to the DLQ
-        Message message=node.getMessage();
-        if(message!=null){
-            // The original destination and transaction id do not get filled 
when the message is first
-            // sent,
-            // it is only populated if the message is routed to another 
destination like the DLQ
-            DeadLetterStrategy 
deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
-            ActiveMQDestination deadLetterDestination=deadLetterStrategy
-                    .getDeadLetterQueueFor(message.getDestination());
-            BrokerSupport.resend(context,message,deadLetterDestination);
-        }
+        broker.sendToDeadLetterQueue(context,node);
     }
 
     /**
@@ -393,7 +383,9 @@
                             // Message may have been sitting in the pending 
list a while
                             // waiting for the consumer to ak the message.
                             
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
-                                continue; // just drop it.
+                                broker.messageExpired(getContext(),node);
+                                dequeueCounter++;
+                                continue;
                             }
                             dispatch(node);
                             count++;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Fri Jul 20 10:08:10 2007
@@ -27,6 +27,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -72,7 +73,6 @@
 public class Queue implements Destination, Task {
 
     private final Log log;
-
     private final ActiveMQDestination destination;
     private final List consumers = new CopyOnWriteArrayList();
     private final Valve dispatchValve = new Valve(true);
@@ -96,9 +96,11 @@
     private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
     private boolean started = false;
+    final Broker broker;
     
-    public Queue(ActiveMQDestination destination, final UsageManager 
memoryManager, MessageStore store, DestinationStatistics parentStats,
+    public Queue(Broker broker,ActiveMQDestination destination, final 
UsageManager memoryManager, MessageStore store, DestinationStatistics 
parentStats,
             TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
+        this.broker=broker;
         this.destination = destination;
         this.usageManager = new 
UsageManager(memoryManager,destination.toString());
         this.usageManager.setUsagePortion(1.0f);
@@ -136,7 +138,8 @@
                     public void recoverMessage(Message message){
                         // Message could have expired while it was being 
loaded..
                         if(message.isExpired()){
-                            // TODO remove from store
+                            
broker.messageExpired(createConnectionContext(),message);
+                            destinationStatistics.getMessages().decrement();
                             return;
                         }
                         message.setRegionDestination(Queue.this);
@@ -342,9 +345,8 @@
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
         if(message.isExpired()){
-            if (log.isDebugEnabled()) {
-                log.debug("Expired message: " + message);
-            }
+            broker.messageExpired(context,message);
+            destinationStatistics.getMessages().decrement();
             if( ( !message.isResponseRequired() || 
producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && 
!context.isInRecoveryMode() ) {
                        ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
                                context.getConnection().dispatchAsync(ack);     
                                                
@@ -365,9 +367,8 @@
                                                
                                                // While waiting for space to 
free up... the message may have expired.
                                        if(message.isExpired()){
-                                           if (log.isDebugEnabled()) {
-                                               log.debug("Expired message: " + 
message);
-                                           }
+                                           
broker.messageExpired(context,message);
+                                
destinationStatistics.getMessages().decrement();
                                            
                                            if( !message.isResponseRequired() 
&& !context.isInRecoveryMode() ) {
                                                        ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
@@ -440,10 +441,8 @@
                         // It could take while before we receive the commit
                         // op, by that time the message could have expired..
                            if(message.isExpired()){
-                               // TODO: remove message from store.
-                               if (log.isDebugEnabled()) {
-                                   log.debug("Expired message: " + message);
-                               }
+                               broker.messageExpired(context,message);
+                            destinationStatistics.getMessages().decrement();
                                return;
                            }
                            sendMessage(context,message);
@@ -1011,9 +1010,8 @@
                                 result.add(node);
                                 count++;
                             }else{
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Expired message: " + node);
-                                }
+                                
broker.messageExpired(createConnectionContext(),node);
+                                
destinationStatistics.getMessages().decrement();
                             }
                         }
                     }finally{

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 Fri Jul 20 10:08:10 2007
@@ -37,6 +37,7 @@
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import 
org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import 
org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@@ -62,6 +63,7 @@
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.BrokerSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.ServiceStopper;
@@ -625,6 +627,52 @@
     public BrokerService getBrokerService(){
         return brokerService;
     }
-    
-    
+
+    public void messageExpired(ConnectionContext context,MessageReference 
node){
+        if(log.isDebugEnabled()){
+            log.debug("Message expired "+node);
+        }
+        getRoot().sendToDeadLetterQueue(context,node);
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext 
context,MessageReference node){
+        try{
+            if(node!=null){
+                Message message=node.getMessage();
+                if(message!=null){
+                    DeadLetterStrategy 
deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
+                    if(deadLetterStrategy!=null){
+                        
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
+                            long expiration=message.getExpiration();
+                            message.setExpiration(0);
+                            message.setProperty("originalExpiration",new 
Long(expiration));
+                            if(!message.isPersistent()){
+                                message.setPersistent(true);
+                                
message.setProperty("originalDeliveryMode","NON_PERSISTENT");
+                            }
+                            // The original destination and transaction id do 
not get filled when the message is first
+                            // sent,
+                            // it is only populated if the message is routed 
to another destination like the DLQ
+                            ActiveMQDestination 
deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
+                                    .getDestination());
+                            
BrokerSupport.resend(context,message,deadLetterDestination);
+                        }
+                    }
+                }else{
+                    log.warn("Null message for node: "+node);
+                }
+            }
+        }catch(Exception e){
+            log.warn("Failed to pass expired message to dead letter queue");
+        }
+    }
+
+    public Broker getRoot(){
+        try{
+            return getBrokerService().getBroker();
+        }catch(Exception e){
+            log.fatal("Trying to get Root Broker "+e);
+            throw new RuntimeException("The broker from the BrokerService 
should not throw an exception");
+        }
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
 Fri Jul 20 10:08:10 2007
@@ -41,7 +41,7 @@
     
     protected Destination createDestination(ConnectionContext context, 
ActiveMQDestination destination) throws Exception {
         final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) 
destination;
-        return new Queue(destination, memoryManager, null, 
destinationStatistics, taskRunnerFactory, null) {
+        return new Queue(broker.getRoot(),destination, memoryManager, null, 
destinationStatistics, taskRunnerFactory, null) {
             
             public void addSubscription(ConnectionContext context,Subscription 
sub) throws Exception {
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 Fri Jul 20 10:08:10 2007
@@ -24,6 +24,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@@ -72,10 +73,11 @@
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy = new 
SharedDeadLetterStrategy();
     private final ConcurrentHashMap durableSubcribers = new 
ConcurrentHashMap();
+    final Broker broker;
     
-    public Topic(ActiveMQDestination destination, TopicMessageStore store, 
UsageManager memoryManager, DestinationStatistics parentStats,
+    public Topic(Broker broker,ActiveMQDestination destination, 
TopicMessageStore store, UsageManager memoryManager, DestinationStatistics 
parentStats,
             TaskRunnerFactory taskFactory) {
-
+        this.broker=broker;
         this.destination = destination;
         this.store = store; //this could be NULL! (If an advsiory)
         this.usageManager = new 
UsageManager(memoryManager,destination.toString());
@@ -261,9 +263,8 @@
        // There is delay between the client sending it and it arriving at the
        // destination.. it may have expired.
        if( message.isExpired() ) {
-            if (log.isDebugEnabled()) {
-                log.debug("Expired message: " + message);
-            }
+            broker.messageExpired(context,message);
+            destinationStatistics.getMessages().decrement();
             if( ( !message.isResponseRequired() || 
producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && 
!context.isInRecoveryMode() ) {
                        ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
                                context.getConnection().dispatchAsync(ack);     
                                                
@@ -285,9 +286,8 @@
                                                
                                                // While waiting for space to 
free up... the message may have expired.
                                        if(message.isExpired()){
-                                           if (log.isDebugEnabled()) {
-                                               log.debug("Expired message: " + 
message);
-                                           }
+                                           
broker.messageExpired(context,message);
+                                
destinationStatistics.getMessages().decrement();
                                            
                                            if( !message.isResponseRequired() 
&& !context.isInRecoveryMode() ) {
                                                        ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
@@ -357,7 +357,9 @@
                        // It could take while before we receive the commit
                        // operration.. by that time the message could have 
expired..
                        if( message.isExpired() ) {
-                               // TODO: remove message from store.
+                               broker.messageExpired(context,message);
+                            message.decrementReferenceCount();
+                            destinationStatistics.getMessages().decrement();
                                return;
                        }
                         dispatch(context, message);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 Fri Jul 20 10:08:10 2007
@@ -103,12 +103,7 @@
                             int messagesToEvict=oldMessages.length;
                             for(int i=0;i<messagesToEvict;i++){
                                 MessageReference oldMessage=oldMessages[i];
-                                oldMessage.decrementReferenceCount();
-                                matched.remove(oldMessage);
-                                discarded++;
-                                if(log.isDebugEnabled()){
-                                    log.debug("Discarding message 
"+oldMessages[i]);
-                                }
+                                discard(oldMessage);
                             }
                             // lets avoid an infinite loop if we are given a 
bad eviction strategy
                             // for a bad strategy lets just not evict
@@ -138,6 +133,7 @@
                     matched.remove();
                     dispatchedCounter.incrementAndGet();
                     node.decrementReferenceCount();
+                    broker.messageExpired(getContext(),node);
                     break;
                 }
             }
@@ -367,6 +363,8 @@
                     // waiting for the consumer to ak the message.
                     if(message.isExpired()){
                         message.decrementReferenceCount();
+                        broker.messageExpired(getContext(),message);
+                        dequeueCounter.incrementAndGet();
                         continue; // just drop it.
                     }
                     dispatch(message);
@@ -409,6 +407,17 @@
             
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
             node.decrementReferenceCount();
         }
+    }
+    
+    private void discard(MessageReference message) {
+        message.decrementReferenceCount();
+        matched.remove(message);
+        discarded++;
+        dequeueCounter.incrementAndGet();
+        if(log.isDebugEnabled()){
+            log.debug("Discarding message "+message);
+        }
+        broker.getRoot().sendToDeadLetterQueue(getContext(),message);
     }
 
     public String toString(){

Added: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?view=auto&rev=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
 (added)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
 Fri Jul 20 10:08:10 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.command.Message;
+
+/**
+ * A strategy for choosing which destination is used for dead letter queue 
messages.
+ * 
+ * @version $Revision: 426366 $
+ */
+public abstract  class AbstractDeadLetterStrategy implements 
DeadLetterStrategy {
+    private boolean processNonPersistent=true;
+    private boolean processExpired=true;
+    
+    public boolean isSendToDeadLetterQueue(Message message){
+        boolean result=false;
+        if(message!=null){
+            result=true;
+            if(message.isPersistent()==false&&processNonPersistent==false){
+                result=false;
+            }
+            if(message.isExpired()&&processExpired==false){
+                result=false;
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * @return the processExpired
+     */
+    public boolean isProcessExpired(){
+        return this.processExpired;
+    }
+    
+    /**
+     * @param processExpired the processExpired to set
+     */
+    public void setProcessExpired(boolean processExpired){
+        this.processExpired=processExpired;
+    }
+    
+    /**
+     * @return the processNonPersistent
+     */
+    public boolean isProcessNonPersistent(){
+        return this.processNonPersistent;
+    }
+    
+    /**
+     * @param processNonPersistent the processNonPersistent to set
+     */
+    public void setProcessNonPersistent(boolean processNonPersistent){
+        this.processNonPersistent=processNonPersistent;
+    }
+
+ 
+}

Propchange: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
 Fri Jul 20 10:08:10 2007
@@ -18,6 +18,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
 
 /**
  * A strategy for choosing which destination is used for dead letter queue 
messages.
@@ -25,6 +26,14 @@
  * @version $Revision$
  */
 public interface DeadLetterStrategy {
+    
+    /**
+     * Allow pluggable strategy for deciding if message should be sent to a 
dead letter queue
+     * for example, you might not want to ignore expired or non-persistent 
messages
+     * @param message
+     * @return true if message should be sent to a dead letter queue
+     */
+    public boolean isSendToDeadLetterQueue(Message message);
 
     /**
      * Returns the dead letter queue for the given destination.

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
 Fri Jul 20 10:08:10 2007
@@ -29,7 +29,7 @@
  * 
  * @version $Revision$
  */
-public class IndividualDeadLetterStrategy implements DeadLetterStrategy {
+public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
 
     private String topicPrefix = "ActiveMQ.DLQ.Topic.";
     private String queuePrefix = "ActiveMQ.DLQ.Queue.";

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
 Fri Jul 20 10:08:10 2007
@@ -29,7 +29,7 @@
  * 
  * @version $Revision$
  */
-public class SharedDeadLetterStrategy implements DeadLetterStrategy {
+public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
 
     private ActiveMQDestination deadLetterQueue = new 
ActiveMQQueue("ActiveMQ.DLQ");
 

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
 Fri Jul 20 10:08:10 2007
@@ -19,6 +19,7 @@
 package org.apache.activemq.broker;
 
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import 
org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -243,5 +244,15 @@
     
     public BrokerService getBrokerService(){
         return null;
+    }
+
+    public void messageExpired(ConnectionContext context,MessageReference 
messageReference){
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext 
context,MessageReference messageReference) {
+    }
+
+    public Broker getRoot(){
+        return this;
     }
 }


Reply via email to