Author: robbie
Date: Tue Dec  7 12:24:44 2010
New Revision: 1042999

URL: http://svn.apache.org/viewvc?rev=1042999&view=rev
Log:
QPID-2973: broker support for rejecting messages without requeue, and 
creating+using DLQs

Added:
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
Modified:
    
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
 Tue Dec  7 12:24:44 2010
@@ -34,7 +34,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 import org.apache.qpid.junit.extensions.util.SizeOf;
@@ -204,7 +204,7 @@ public class DiagnosticExchange extends 
         return false;
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public void route(InboundMessage payload) throws AMQException
     {
         
         Long value = new Long(SizeOf.getUsedMemory());

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
 Tue Dec  7 12:24:44 2010
@@ -28,7 +28,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -102,7 +102,7 @@ public class TestExchange implements Exc
     {
     }
 
-    public void route(IncomingMessage message) throws AMQException
+    public void route(InboundMessage message) throws AMQException
     {
     }
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Tue Dec  7 12:24:44 2010
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -44,12 +45,14 @@ import org.apache.qpid.server.flow.FlowC
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.DLQTransactionalContext;
 import org.apache.qpid.server.txn.LocalTransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
@@ -1092,4 +1095,75 @@ public class AMQChannel
             }
         }
     }
+
+    public void deadLetter(long deliveryTag) throws AMQException
+    {
+        UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
+        QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+        
+        if (rejectedQueueEntry == null)
+        {
+            _log.warn("No message found, unable to DLQ delivery tag: " + 
deliveryTag);
+            return;
+        }
+        else
+        {
+            AMQMessage msg = rejectedQueueEntry.getMessage();
+
+            AMQQueue queue = rejectedQueueEntry.getQueue();
+            Exchange altExchange = queue.getAlternateExchange();
+            
+            //TODO:remove below line, its temporary for some noddy testing only
+//            altExchange = 
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry().getExchange(new
 AMQShortString("dle.test"));
+            if (altExchange == null)
+            {
+                _log.warn("No alternate exchange configured for queue, must 
discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+                rejectedQueueEntry.discard(new StoreContext());
+                return;
+            }
+
+            InboundMessageAdapter adapter = new InboundMessageAdapter(msg);
+            altExchange.route(adapter);
+
+            ArrayList<AMQQueue> destinationQueues = adapter.getEnqueuedList();
+            if (destinationQueues == null || destinationQueues.isEmpty())
+            {
+                _log.warn("Routing process provided no queues to enqueue the 
message on, must discard message as unable to DLQ: delivery tag: " + 
deliveryTag);
+                rejectedQueueEntry.discard(new StoreContext());
+                return;
+            }
+            
+            //increment the message reference count to include the new queue(s)
+            msg.incrementReference(destinationQueues.size());
+            
+            //create a new storeContext to use with a new TransactionContext 
for the DLQ process
+            StoreContext dlqStoreContext = new StoreContext("Session: " + 
_session.getClientIdentifier() + "; channel: " + _channelId + "; DLQ 
deliveryTag: " + deliveryTag);
+            DLQTransactionalContext dlqTxnContext = new 
DLQTransactionalContext(this, dlqStoreContext);
+            
+            //enqueue the message on the new queues in the store if its 
persistent
+            if (msg.isPersistent())
+            {
+                MessageStore store = getMessageStore();
+                
+                for (int i = 0; i < destinationQueues.size(); i++)
+                {
+                    store.enqueueMessage(dlqStoreContext, 
destinationQueues.get(i), msg.getMessageId());
+                }
+            }
+            
+            //TODO: ensure the AMQMessage used is NOT marked IMMEDIATE, to 
prevent it not being enqueued
+            
+            //configure the txn context to ack consumption from old queue upon 
commit
+            unackedMap.acknowledgeMessage(deliveryTag, false, dlqTxnContext);
+            
+            //configure the txn context to deliver to the new queues following 
commit
+            for (int i = 0; i < destinationQueues.size(); i++)
+            {
+                dlqTxnContext.deliver(destinationQueues.get(i), msg);
+            }
+            
+            dlqTxnContext.commit();
+            
+        }
+    }
 }

Added: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java?rev=1042999&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
 Tue Dec  7 12:24:44 2010
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.InboundMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+
+public class InboundMessageAdapter implements InboundMessage
+{
+    private AMQMessage _message;
+    private ArrayList<AMQQueue> _queues;
+
+    public InboundMessageAdapter(AMQMessage msg)
+    {
+        _message = msg;
+    }
+
+    public Long getMessageId()
+    {
+        return _message.getMessageId();
+    }
+
+    public boolean isPersistent()
+    {
+        return _message.isPersistent();
+    }
+
+    public boolean isRedelivered()
+    {
+        return _message.isRedelivered();
+    }
+
+    public AMQShortString getRoutingKey() throws AMQException
+    {
+        return _message.getRoutingKey();
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        try
+        {
+            return _message.getContentHeaderBody();
+        }
+        catch (AMQException e)
+        {
+            throw new RuntimeException("Error retrieving ContentHeaderBody: " 
+ e, e);
+        }
+    }
+
+    public void enqueue(ArrayList<AMQQueue> queues)
+    {
+        _queues = queues;
+    }
+    
+    public ArrayList<AMQQueue> getEnqueuedList()
+    {
+        return _queues;
+    }
+}

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
 Tue Dec  7 12:24:44 2010
@@ -36,6 +36,8 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.SystemConfiguration;
 import org.apache.commons.configuration.XMLConfiguration;
 import 
org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -694,4 +696,20 @@ public class ServerConfiguration impleme
     {
         return getConfig().getBoolean("statistics.reporting.reset", false);
     }
+
+    /**
+     * String to affix to end of queue name when generating an alternate 
exchange for DLQ purposes.
+     */
+    public String getDeadLetterExchangeSuffix()
+    {
+        return getConfig().getString("deadLetterExchangeSuffix", 
DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+    }
+
+    /**
+     * String to affix to end of queue name when generating a queue for DLQ 
purposes.
+     */
+    public String getDeadLetterQueueSuffix()
+    {
+        return getConfig().getString("deadLetterQueueSuffix", 
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+    }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
 Tue Dec  7 12:24:44 2010
@@ -36,6 +36,7 @@ import org.apache.qpid.server.virtualhos
 public class DefaultExchangeFactory implements ExchangeFactory
 {
     private static final Logger _logger = 
Logger.getLogger(DefaultExchangeFactory.class);
+    public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
 
     private Map<AMQShortString, ExchangeType<? extends Exchange>> 
_exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends 
Exchange>>();
     private final VirtualHost _host;

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
 Tue Dec  7 12:24:44 2010
@@ -40,7 +40,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -227,7 +227,7 @@ public class DirectExchange extends Abst
         }
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public void route(InboundMessage payload) throws AMQException
     {
 
         final AMQShortString routingKey = payload.getRoutingKey() == null ? 
AMQShortString.EMPTY_STRING : payload.getRoutingKey();

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
 Tue Dec  7 12:24:44 2010
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 
+import org.apache.qpid.server.queue.InboundMessage;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -54,7 +55,7 @@ public interface Exchange
 
     void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable 
args) throws AMQException;
 
-    void route(IncomingMessage message) throws AMQException;
+    void route(InboundMessage message) throws AMQException;
 
 
     /**

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
 Tue Dec  7 12:24:44 2010
@@ -28,7 +28,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -227,7 +227,7 @@ public class FanoutExchange extends Abst
         }
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public void route(InboundMessage payload) throws AMQException
     {
 
     

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
 Tue Dec  7 12:24:44 2010
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.queue.InboundMessage;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -286,7 +287,7 @@ public class HeadersExchange extends Abs
         }
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public void route(InboundMessage payload) throws AMQException
     {
         FieldTable headers = getHeaders(payload.getContentHeaderBody());
         if (_logger.isDebugEnabled())

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
 Tue Dec  7 12:24:44 2010
@@ -30,6 +30,7 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortStringTokenizer;
+import org.apache.qpid.server.queue.InboundMessage;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -265,7 +266,7 @@ public class TopicExchange extends Abstr
             _filteredQueues.put(queue,newFilters);
         }
 
-        public Collection<AMQQueue> processMessage(IncomingMessage msg, 
Collection<AMQQueue> queues)
+        public Collection<AMQQueue> processMessage(InboundMessage msg, 
Collection<AMQQueue> queues)
         {
             if(queues == null)
             {
@@ -501,7 +502,7 @@ public class TopicExchange extends Abstr
 
         if(selectorRef == null || (selector = selectorRef.get())==null)
         {
-            selector = new JMSSelectorFilter<RuntimeException>(selectorString);
+            selector = new JMSSelectorFilter(selectorString);
             _selectorCache.put(selectorString, new 
WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
         }
         return selector;
@@ -563,7 +564,7 @@ public class TopicExchange extends Abstr
         return normalizedString;
     }
 
-    public void route(IncomingMessage payload) throws AMQException
+    public void route(InboundMessage payload) throws AMQException
     {
 
         final AMQShortString routingKey = payload.getRoutingKey();
@@ -681,7 +682,7 @@ public class TopicExchange extends Abstr
         }
     }
 
-    private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, 
AMQShortString routingKey)
+    private Collection<AMQQueue> getMatchedQueues(InboundMessage message, 
AMQShortString routingKey)
     {
 
         Collection<TopicMatcherResult> results = _parser.parse(routingKey);

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
 Tue Dec  7 12:24:44 2010
@@ -59,7 +59,6 @@ public class BasicRejectMethodHandler im
         {
             _logger.debug("Rejecting:" + body.getDeliveryTag() +
                           ": Requeue:" + body.getRequeue() +
-                          //": Resend:" + evt.getMethod().resend +
                           " on channel:" + channel.debugIdentity());
         }
 
@@ -70,26 +69,23 @@ public class BasicRejectMethodHandler im
         if (message == null)
         {
             _logger.warn("Dropping reject request as message is null for tag:" 
+ deliveryTag);
-//            throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND, 
"Delivery Tag(" + deliveryTag + ")not known");
         }                 
         else
         {
             if (message.isQueueDeleted())
             {
-                _logger.warn("Message's Queue as already been purged, unable 
to Reject. " +
-                             "Dropping message should use Dead Letter Queue");
+                _logger.warn("Message's Queue has already been purged, 
dropping message");
                 message = 
channel.getUnacknowledgedMessageMap().remove(deliveryTag);
                 if(message != null)
                 {
                     message.discard(channel.getStoreContext());
                 }
-                //sendtoDeadLetterQueue(msg)
                 return;
             }
 
             if (!message.getMessage().isReferenced())
             {
-                _logger.warn("Message as already been purged, unable to 
Reject.");
+                _logger.warn("Message has already been purged, unable to 
Reject.");
                 return;
             }
 
@@ -98,15 +94,10 @@ public class BasicRejectMethodHandler im
             {
                 _logger.debug("Rejecting: DT:" + deliveryTag + "-" + 
message.getMessage().debugIdentity() +
                               ": Requeue:" + body.getRequeue() +
-                              //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());
             }
 
-            // If we haven't requested message to be resent to this consumer 
then reject it from ever getting it.
-            //if (!evt.getMethod().resend)
-            {
-                message.reject();
-            }
+            message.reject();
 
             if (body.getRequeue())
             {
@@ -114,11 +105,7 @@ public class BasicRejectMethodHandler im
             }
             else
             {
-                _logger.warn("Dropping message as requeue not required and 
there is no dead letter queue");
-                 message = 
channel.getUnacknowledgedMessageMap().remove(deliveryTag);
-                //sendtoDeadLetterQueue(AMQMessage message)
-//                message.queue = channel.getDefaultDeadLetterQueue();
-//                channel.requeue(deliveryTag);
+                channel.deadLetter(body.getDeliveryTag());
             }
         }
     }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Tue Dec  7 12:24:44 2010
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -41,7 +42,7 @@ import java.util.concurrent.atomic.Atomi
 /**
  * A deliverable message.
  */
-public class AMQMessage implements Filterable<AMQException>
+public class AMQMessage implements Filterable
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -273,7 +274,10 @@ public class AMQMessage implements Filte
         return _messageHandle.getContentHeaderBody(getStoreContext());
     }
 
-
+    public AMQShortString getRoutingKey() throws AMQException
+    {
+        return getMessagePublishInfo().getRoutingKey();
+    }
 
     public Long getMessageId()
     {
@@ -373,7 +377,7 @@ public class AMQMessage implements Filte
         return (_flags & DELIVERED_TO_CONSUMER) != 0;
     }
 
-    public boolean isPersistent() throws AMQException
+    public boolean isPersistent()
     {
         return _messageHandle.isPersistent();
     }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Tue Dec  7 12:24:44 2010
@@ -95,7 +95,7 @@ public interface AMQQueue extends Managa
 
     QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws 
AMQException;
 
-    void requeue(StoreContext storeContext, QueueEntry entry) throws 
AMQException;
+    void requeue(QueueEntry entry) throws AMQException;
 
     void dequeue(StoreContext storeContext, QueueEntry entry) throws 
FailedDequeueException;
 
@@ -248,4 +248,8 @@ public interface AMQQueue extends Managa
     void configure(QueueConfiguration config);
     
     ManagedObject getManagedObject();
+
+    Exchange getAlternateExchange();
+    
+    void setAlternateExchange(Exchange exchange);
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
 Tue Dec  7 12:24:44 2010
@@ -21,17 +21,23 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.Map;
-import java.util.HashMap;
-
-
 public class AMQQueueFactory
 {
+    public static final boolean CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED 
= true;//TODO: take from queue configuration
+    public static final AMQShortString DLQ_ROUTING_KEY = new 
AMQShortString("dlq");
+    public static final AMQShortString X_QPID_DLQ_ENABLED = new 
AMQShortString("x-qpid-dlq-enabled");
+    public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
     public static final AMQShortString X_QPID_PRIORITIES = new 
AMQShortString("x-qpid-priorities");
     private static final AMQShortString QPID_LAST_VALUE_QUEUE = new 
AMQShortString ("qpid.last_value_queue");
     private static final AMQShortString QPID_LAST_VALUE_QUEUE_KEY = new 
AMQShortString("qpid.last_value_queue_key");
@@ -147,7 +153,6 @@ public class AMQQueueFactory
             }
         }
 
-
         AMQQueue q = null;
         if(conflationKey != null)
         {
@@ -164,7 +169,8 @@ public class AMQQueueFactory
 
         //Register the new queue
         virtualHost.getQueueRegistry().registerQueue(q);
-        
q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+        QueueConfiguration qConfig = 
virtualHost.getConfiguration().getQueueConfiguration(name.asString());
+        q.configure(qConfig);
 
         if(arguments != null)
         {
@@ -177,6 +183,74 @@ public class AMQQueueFactory
             }
         }
 
+        boolean dlqArgPresent = (arguments != null && 
(arguments.containsKey(X_QPID_DLQ_ENABLED)));
+
+        if(dlqArgPresent || CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED)
+        {
+            //verify that the argument isn't explicitly disabling DLQ for this 
queue.
+            boolean dlqEnabled = true;
+            if(dlqArgPresent)
+            {
+                dlqEnabled = arguments.getBoolean(X_QPID_DLQ_ENABLED);
+            }
+
+            //feature is not to be enabled for temporary queues or when 
explicitly disabled by argument
+            if(!q.isAutoDelete() && dlqEnabled) 
+            {
+                ServerConfiguration serverConfig = 
ApplicationRegistry.getInstance().getConfiguration();
+                AMQShortString dlExchangeName = new AMQShortString(name + 
serverConfig.getDeadLetterExchangeSuffix());
+                AMQShortString dlQueueName = new AMQShortString(name + 
serverConfig.getDeadLetterQueueSuffix());
+
+                ExchangeRegistry exchangeRegistry = 
virtualHost.getExchangeRegistry();
+                ExchangeFactory exchangeFactory = 
virtualHost.getExchangeFactory();
+                QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+                Exchange dlExchange = null;
+                synchronized(exchangeRegistry)
+                {
+                    dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+                    if(dlExchange == null)
+                    {
+                        dlExchange = 
exchangeFactory.createExchange(dlExchangeName, 
+                                ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, 
false, 0);
+
+                        exchangeRegistry.registerExchange(dlExchange);
+
+                        //enter the dle in the persistent store
+                        
virtualHost.getMessageStore().createExchange(dlExchange);
+                    }
+                }
+
+                AMQQueue dlQueue = null;
+                synchronized(queueRegistry)
+                {
+                    dlQueue = queueRegistry.getQueue(dlQueueName);
+
+                    if(dlQueue == null)
+                    {
+                        //set args to disable DLQ'ing from the DLQ itself, 
preventing loops etc
+                        FieldTable args = new FieldTable();
+                        args.setBoolean(X_QPID_DLQ_ENABLED, false);
+                        
+                        dlQueue = createAMQQueueImpl(dlQueueName, true, owner, 
false, virtualHost, args);
+
+                        //enter the dlq in the persistent store
+                        virtualHost.getMessageStore().createQueue(dlQueue, 
args);
+                    }
+                }
+
+                //ensure the queue is bound to the exchange
+                if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+                {
+                    dlQueue.bind(dlExchange, DLQ_ROUTING_KEY, null);
+                }
+                
+                q.setAlternateExchange(dlExchange);
+            }
+
+        }
+        
         return q;
     }
 
@@ -212,6 +286,15 @@ public class AMQQueueFactory
             arguments.setString(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() 
== null ? QPID_LVQ_KEY : config.getLVQKey());
 
         }
+        if (!config.getAutoDelete() && 
CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED)
+        {
+            if(arguments == null)
+            {
+                arguments = new FieldTable();
+            }
+            
+            arguments.setBoolean(X_QPID_DLQ_ENABLED, true);
+        }
 
         AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, 
host, arguments);
         q.configure(config);

Added: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java?rev=1042999&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
 Tue Dec  7 12:24:44 2010
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.ArrayList;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+public interface InboundMessage extends Filterable<RuntimeException>
+{
+    AMQShortString getRoutingKey() throws AMQException;
+
+    Long getMessageId();
+
+    void enqueue(ArrayList<AMQQueue> queues);
+}
\ No newline at end of file

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
 Tue Dec  7 12:24:44 2010
@@ -35,9 +35,8 @@ import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collection;
 
-public class IncomingMessage implements Filterable<RuntimeException>
+public class IncomingMessage implements InboundMessage
 {
 
     /** Used for debugging purposes. */
@@ -279,7 +278,6 @@ public class IncomingMessage implements 
         return _contentHeaderBody;
     }
 
-
     public boolean isPersistent()
     {
         return getContentHeaderBody().properties instanceof 
BasicContentHeaderProperties &&

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 Tue Dec  7 12:24:44 2010
@@ -171,7 +171,7 @@ public interface QueueEntry extends Comp
 
     boolean isRejectedBy(Subscription subscription);
 
-    void requeue(StoreContext storeContext) throws AMQException;
+    void requeue() throws AMQException;
 
     void dequeue(final StoreContext storeContext) throws 
FailedDequeueException;
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Tue Dec  7 12:24:44 2010
@@ -260,9 +260,9 @@ public class QueueEntryImpl implements Q
     }
 
 
-    public void requeue(final StoreContext storeContext) throws AMQException
+    public void requeue() throws AMQException
     {
-        getQueue().requeue(storeContext, this);
+        getQueue().requeue(this);
         if(_stateChangeListeners != null)
         {
             notifyStateChange(QueueEntry.State.ACQUIRED, 
QueueEntry.State.AVAILABLE);

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Tue Dec  7 12:24:44 2010
@@ -129,6 +129,8 @@ public class SimpleAMQQueue implements A
     private long _flowResumeCapacity = 
ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
 
+    private Exchange _alternateExchange;
+
     protected SimpleAMQQueue(AMQShortString name, boolean durable, 
AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
     {
@@ -554,7 +556,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public void requeue(StoreContext storeContext, QueueEntry entry) throws 
AMQException
+    public void requeue(QueueEntry entry) throws AMQException
     {
 
         SubscriptionList.SubscriptionNodeIterator subscriberIter = 
_subscriptionList.iterator();
@@ -1812,4 +1814,14 @@ public class SimpleAMQQueue implements A
     {
         return String.valueOf(getName());
     }
+
+    public Exchange getAlternateExchange()
+    {
+        return _alternateExchange;
+    }
+    
+    public void setAlternateExchange(Exchange exchange)
+    {
+        _alternateExchange = exchange;
+    }
 }

Added: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java?rev=1042999&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
 Tue Dec  7 12:24:44 2010
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+
+/** A transactional context that only supports local transactions
+ * for use in DeadLetterQueue processing */
+public class DLQTransactionalContext extends LocalTransactionalContext
+{
+    private final StoreContext _storeContext;
+
+    public DLQTransactionalContext(final AMQChannel channel, final 
StoreContext storeContext)
+    {
+        super(channel);
+        _storeContext = storeContext;
+    }
+
+    @Override
+    public void deliver(final AMQQueue queue, AMQMessage message) throws 
AMQException
+    {
+        //TODO: ensure message is not Immediate. Copy the message object if 
necessary.
+        deliver(queue, message, true);
+    }
+
+    @Override
+    public StoreContext getStoreContext()
+    {
+        return _storeContext;
+    }
+}
\ No newline at end of file

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
 Tue Dec  7 12:24:44 2010
@@ -76,7 +76,7 @@ public class LocalTransactionalContext i
 
         public void process() throws AMQException
         {
-            entry.requeue(getStoreContext());
+            entry.requeue();
         }
     }
 
@@ -84,11 +84,13 @@ public class LocalTransactionalContext i
     {
         private final AMQQueue _queue;
         private final AMQMessage _message;
+        private final boolean _enqueueOnly;
 
-        public PublishAction(final AMQQueue queue, final AMQMessage message)
+        public PublishAction(final AMQQueue queue, final AMQMessage message, 
boolean enqueueOnly)
         {
             _queue = queue;
             _message = message;
+            _enqueueOnly = enqueueOnly;
         }
 
         public void process() throws AMQException
@@ -98,11 +100,14 @@ public class LocalTransactionalContext i
             try
             {
                 QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
-                _queue.checkCapacity(_channel);
-
-                if(entry.immediateAndNotDelivered())
+                if(!_enqueueOnly)
                 {
-                    getReturnMessages().add(new 
NoConsumersException(_message));
+                    _queue.checkCapacity(_channel);
+
+                    if(entry.immediateAndNotDelivered())
+                    {
+                        getReturnMessages().add(new 
NoConsumersException(_message));
+                    }
                 }
             }
             finally
@@ -151,24 +156,26 @@ public class LocalTransactionalContext i
 
     public void deliver(final AMQQueue queue, AMQMessage message) throws 
AMQException
     {
+        deliver(queue, message, false);
+    }
+    
+    protected void deliver(final AMQQueue queue, AMQMessage message, boolean 
enqueueOnly) throws AMQException
+    {
         // A publication will result in the enlisting of several
         // TxnOps. The first is an op that will store the message.
         // Following that (and ordering is important), an op will
         // be added for every queue onto which the message is
         // enqueued.
-        _postCommitDeliveryList.add(new PublishAction(queue, message));
+        _postCommitDeliveryList.add(new PublishAction(queue, message, 
enqueueOnly));
         _messageDelivered = true;
-
     }
 
     public void requeue(QueueEntry entry) throws AMQException
     {
         _postCommitDeliveryList.add(new RequeueAction(entry));
         _messageDelivered = true;
-
     }
 
-
     private void checkAck(long deliveryTag, UnacknowledgedMessageMap 
unacknowledgedMessageMap) throws AMQException
     {
         if (!unacknowledgedMessageMap.contains(deliveryTag))
@@ -255,10 +262,10 @@ public class LocalTransactionalContext i
 
         if (_ackOp != null)
         {
-
+            //there are unacknowledged messages to commit delivery of to the 
client
             _messageDelivered = true;
             _ackOp.consolidate();
-            // already enlisted, after commit will reset regardless of outcome
+            // ackOp has already enlisted in the txnBuffer, after commit will 
reset regardless of outcome
             _ackOp = null;
         }
 
@@ -293,7 +300,7 @@ public class LocalTransactionalContext i
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug("Performing post commit delivery");
+            _log.debug("Beginning post commit delivery");
         }
 
         try
@@ -306,6 +313,7 @@ public class LocalTransactionalContext i
         finally
         {
             _postCommitDeliveryList.clear();
+            _log.debug("Completed post commit delivery");
         }
     }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 Tue Dec  7 12:24:44 2010
@@ -115,7 +115,7 @@ public class NonTransactionalContext imp
 
     public void requeue(QueueEntry entry) throws AMQException
     {
-        entry.requeue(_storeContext);
+        entry.requeue();
     }
 
     public void acknowledgeMessage(final long deliveryTag, long 
lastDeliveryTag,

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
 Tue Dec  7 12:24:44 2010
@@ -349,14 +349,7 @@ public class Show extends AbstractComman
 
             arrival.add("" + msg.getArrivalTime());
 
-            try
-            {
-                ispersitent.add(msg.isPersistent() ? "true" : "false");
-            }
-            catch (AMQException e)
-            {
-                ispersitent.add("n/a");
-            }
+            ispersitent.add(msg.isPersistent() ? "true" : "false");
 
             isredelivered.add(msg.isRedelivered() ? "true" : "false");
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 Tue Dec  7 12:24:44 2010
@@ -367,7 +367,7 @@ public class AbstractHeadersExchangeTest
                     return false;  //To change body of implemented methods use 
File | Settings | File Templates.
                 }
 
-                public void requeue(StoreContext storeContext) throws 
AMQException
+                public void requeue() throws AMQException
                 {
                     //To change body of implemented methods use File | 
Settings | File Templates.
                 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
 Tue Dec  7 12:24:44 2010
@@ -167,7 +167,7 @@ public class MockAMQQueue implements AMQ
         return null;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
 
-    public void requeue(StoreContext storeContext, QueueEntry entry) throws 
AMQException
+    public void requeue(QueueEntry entry) throws AMQException
     {
         //To change body of implemented methods use File | Settings | File 
Templates.
     }
@@ -376,4 +376,14 @@ public class MockAMQQueue implements AMQ
     {
         return false;
     }
+
+    public Exchange getAlternateExchange()
+    {
+        return null;
+    }
+
+    public void setAlternateExchange(Exchange exchange)
+    {
+
+    }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
 Tue Dec  7 12:24:44 2010
@@ -167,7 +167,7 @@ public class MockQueueEntry implements Q
     }
 
     
-    public void requeue(StoreContext storeContext) throws AMQException
+    public void requeue() throws AMQException
     {
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to