Author: robbie
Date: Tue Dec 7 12:24:44 2010
New Revision: 1042999
URL: http://svn.apache.org/viewvc?rev=1042999&view=rev
Log:
QPID-2973: broker support for rejecting messages without requeue, and
creating+using DLQs
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
Tue Dec 7 12:24:44 2010
@@ -34,7 +34,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.junit.extensions.util.SizeOf;
@@ -204,7 +204,7 @@ public class DiagnosticExchange extends
return false;
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
Long value = new Long(SizeOf.getUsedMemory());
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
Tue Dec 7 12:24:44 2010
@@ -28,7 +28,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -102,7 +102,7 @@ public class TestExchange implements Exc
{
}
- public void route(IncomingMessage message) throws AMQException
+ public void route(InboundMessage message) throws AMQException
{
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Tue Dec 7 12:24:44 2010
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -44,12 +45,14 @@ import org.apache.qpid.server.flow.FlowC
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.DLQTransactionalContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -1092,4 +1095,75 @@ public class AMQChannel
}
}
}
+
+ public void deadLetter(long deliveryTag) throws AMQException
+ {
+ UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
+ QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+
+ if (rejectedQueueEntry == null)
+ {
+ _log.warn("No message found, unable to DLQ delivery tag: " +
deliveryTag);
+ return;
+ }
+ else
+ {
+ AMQMessage msg = rejectedQueueEntry.getMessage();
+
+ AMQQueue queue = rejectedQueueEntry.getQueue();
+ Exchange altExchange = queue.getAlternateExchange();
+
+ //TODO:remove below line, its temporary for some noddy testing only
+// altExchange =
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getExchangeRegistry().getExchange(new
AMQShortString("dle.test"));
+ if (altExchange == null)
+ {
+ _log.warn("No alternate exchange configured for queue, must
discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ rejectedQueueEntry.discard(new StoreContext());
+ return;
+ }
+
+ InboundMessageAdapter adapter = new InboundMessageAdapter(msg);
+ altExchange.route(adapter);
+
+ ArrayList<AMQQueue> destinationQueues = adapter.getEnqueuedList();
+ if (destinationQueues == null || destinationQueues.isEmpty())
+ {
+ _log.warn("Routing process provided no queues to enqueue the
message on, must discard message as unable to DLQ: delivery tag: " +
deliveryTag);
+ rejectedQueueEntry.discard(new StoreContext());
+ return;
+ }
+
+ //increment the message reference count to include the new queue(s)
+ msg.incrementReference(destinationQueues.size());
+
+ //create a new storeContext to use with a new TransactionContext
for the DLQ process
+ StoreContext dlqStoreContext = new StoreContext("Session: " +
_session.getClientIdentifier() + "; channel: " + _channelId + "; DLQ
deliveryTag: " + deliveryTag);
+ DLQTransactionalContext dlqTxnContext = new
DLQTransactionalContext(this, dlqStoreContext);
+
+ //enqueue the message on the new queues in the store if its
persistent
+ if (msg.isPersistent())
+ {
+ MessageStore store = getMessageStore();
+
+ for (int i = 0; i < destinationQueues.size(); i++)
+ {
+ store.enqueueMessage(dlqStoreContext,
destinationQueues.get(i), msg.getMessageId());
+ }
+ }
+
+ //TODO: ensure the AMQMessage used is NOT marked IMMEDIATE, to
prevent it not being enqueued
+
+ //configure the txn context to ack consumption from old queue upon
commit
+ unackedMap.acknowledgeMessage(deliveryTag, false, dlqTxnContext);
+
+ //configure the txn context to deliver to the new queues following
commit
+ for (int i = 0; i < destinationQueues.size(); i++)
+ {
+ dlqTxnContext.deliver(destinationQueues.get(i), msg);
+ }
+
+ dlqTxnContext.commit();
+
+ }
+ }
}
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java?rev=1042999&view=auto
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
(added)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/InboundMessageAdapter.java
Tue Dec 7 12:24:44 2010
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.InboundMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+
+public class InboundMessageAdapter implements InboundMessage
+{
+ private AMQMessage _message;
+ private ArrayList<AMQQueue> _queues;
+
+ public InboundMessageAdapter(AMQMessage msg)
+ {
+ _message = msg;
+ }
+
+ public Long getMessageId()
+ {
+ return _message.getMessageId();
+ }
+
+ public boolean isPersistent()
+ {
+ return _message.isPersistent();
+ }
+
+ public boolean isRedelivered()
+ {
+ return _message.isRedelivered();
+ }
+
+ public AMQShortString getRoutingKey() throws AMQException
+ {
+ return _message.getRoutingKey();
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ try
+ {
+ return _message.getContentHeaderBody();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Error retrieving ContentHeaderBody: "
+ e, e);
+ }
+ }
+
+ public void enqueue(ArrayList<AMQQueue> queues)
+ {
+ _queues = queues;
+ }
+
+ public ArrayList<AMQQueue> getEnqueuedList()
+ {
+ return _queues;
+ }
+}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
Tue Dec 7 12:24:44 2010
@@ -36,6 +36,8 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import
org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -694,4 +696,20 @@ public class ServerConfiguration impleme
{
return getConfig().getBoolean("statistics.reporting.reset", false);
}
+
+ /**
+ * String to affix to end of queue name when generating an alternate
exchange for DLQ purposes.
+ */
+ public String getDeadLetterExchangeSuffix()
+ {
+ return getConfig().getString("deadLetterExchangeSuffix",
DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ }
+
+ /**
+ * String to affix to end of queue name when generating a queue for DLQ
purposes.
+ */
+ public String getDeadLetterQueueSuffix()
+ {
+ return getConfig().getString("deadLetterQueueSuffix",
AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX);
+ }
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
Tue Dec 7 12:24:44 2010
@@ -36,6 +36,7 @@ import org.apache.qpid.server.virtualhos
public class DefaultExchangeFactory implements ExchangeFactory
{
private static final Logger _logger =
Logger.getLogger(DefaultExchangeFactory.class);
+ public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
private Map<AMQShortString, ExchangeType<? extends Exchange>>
_exchangeClassMap = new HashMap<AMQShortString, ExchangeType<? extends
Exchange>>();
private final VirtualHost _host;
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
Tue Dec 7 12:24:44 2010
@@ -40,7 +40,7 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -227,7 +227,7 @@ public class DirectExchange extends Abst
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
final AMQShortString routingKey = payload.getRoutingKey() == null ?
AMQShortString.EMPTY_STRING : payload.getRoutingKey();
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
Tue Dec 7 12:24:44 2010
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -54,7 +55,7 @@ public interface Exchange
void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable
args) throws AMQException;
- void route(IncomingMessage message) throws AMQException;
+ void route(InboundMessage message) throws AMQException;
/**
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
Tue Dec 7 12:24:44 2010
@@ -28,7 +28,7 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -227,7 +227,7 @@ public class FanoutExchange extends Abst
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
Tue Dec 7 12:24:44 2010
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -286,7 +287,7 @@ public class HeadersExchange extends Abs
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
FieldTable headers = getHeaders(payload.getContentHeaderBody());
if (_logger.isDebugEnabled())
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
Tue Dec 7 12:24:44 2010
@@ -30,6 +30,7 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortStringTokenizer;
+import org.apache.qpid.server.queue.InboundMessage;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -265,7 +266,7 @@ public class TopicExchange extends Abstr
_filteredQueues.put(queue,newFilters);
}
- public Collection<AMQQueue> processMessage(IncomingMessage msg,
Collection<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(InboundMessage msg,
Collection<AMQQueue> queues)
{
if(queues == null)
{
@@ -501,7 +502,7 @@ public class TopicExchange extends Abstr
if(selectorRef == null || (selector = selectorRef.get())==null)
{
- selector = new JMSSelectorFilter<RuntimeException>(selectorString);
+ selector = new JMSSelectorFilter(selectorString);
_selectorCache.put(selectorString, new
WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
}
return selector;
@@ -563,7 +564,7 @@ public class TopicExchange extends Abstr
return normalizedString;
}
- public void route(IncomingMessage payload) throws AMQException
+ public void route(InboundMessage payload) throws AMQException
{
final AMQShortString routingKey = payload.getRoutingKey();
@@ -681,7 +682,7 @@ public class TopicExchange extends Abstr
}
}
- private Collection<AMQQueue> getMatchedQueues(IncomingMessage message,
AMQShortString routingKey)
+ private Collection<AMQQueue> getMatchedQueues(InboundMessage message,
AMQShortString routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
Tue Dec 7 12:24:44 2010
@@ -59,7 +59,6 @@ public class BasicRejectMethodHandler im
{
_logger.debug("Rejecting:" + body.getDeliveryTag() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
@@ -70,26 +69,23 @@ public class BasicRejectMethodHandler im
if (message == null)
{
_logger.warn("Dropping reject request as message is null for tag:"
+ deliveryTag);
-// throw evt.getMethod().getChannelException(AMQConstant.NOT_FOUND,
"Delivery Tag(" + deliveryTag + ")not known");
}
else
{
if (message.isQueueDeleted())
{
- _logger.warn("Message's Queue as already been purged, unable
to Reject. " +
- "Dropping message should use Dead Letter Queue");
+ _logger.warn("Message's Queue has already been purged,
dropping message");
message =
channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
message.discard(channel.getStoreContext());
}
- //sendtoDeadLetterQueue(msg)
return;
}
if (!message.getMessage().isReferenced())
{
- _logger.warn("Message as already been purged, unable to
Reject.");
+ _logger.warn("Message has already been purged, unable to
Reject.");
return;
}
@@ -98,15 +94,10 @@ public class BasicRejectMethodHandler im
{
_logger.debug("Rejecting: DT:" + deliveryTag + "-" +
message.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
- //": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
}
- // If we haven't requested message to be resent to this consumer
then reject it from ever getting it.
- //if (!evt.getMethod().resend)
- {
- message.reject();
- }
+ message.reject();
if (body.getRequeue())
{
@@ -114,11 +105,7 @@ public class BasicRejectMethodHandler im
}
else
{
- _logger.warn("Dropping message as requeue not required and
there is no dead letter queue");
- message =
channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- //sendtoDeadLetterQueue(AMQMessage message)
-// message.queue = channel.getDefaultDeadLetterQueue();
-// channel.requeue(deliveryTag);
+ channel.deadLetter(body.getDeliveryTag());
}
}
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Tue Dec 7 12:24:44 2010
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -41,7 +42,7 @@ import java.util.concurrent.atomic.Atomi
/**
* A deliverable message.
*/
-public class AMQMessage implements Filterable<AMQException>
+public class AMQMessage implements Filterable
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -273,7 +274,10 @@ public class AMQMessage implements Filte
return _messageHandle.getContentHeaderBody(getStoreContext());
}
-
+ public AMQShortString getRoutingKey() throws AMQException
+ {
+ return getMessagePublishInfo().getRoutingKey();
+ }
public Long getMessageId()
{
@@ -373,7 +377,7 @@ public class AMQMessage implements Filte
return (_flags & DELIVERED_TO_CONSUMER) != 0;
}
- public boolean isPersistent() throws AMQException
+ public boolean isPersistent()
{
return _messageHandle.isPersistent();
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Tue Dec 7 12:24:44 2010
@@ -95,7 +95,7 @@ public interface AMQQueue extends Managa
QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws
AMQException;
- void requeue(StoreContext storeContext, QueueEntry entry) throws
AMQException;
+ void requeue(QueueEntry entry) throws AMQException;
void dequeue(StoreContext storeContext, QueueEntry entry) throws
FailedDequeueException;
@@ -248,4 +248,8 @@ public interface AMQQueue extends Managa
void configure(QueueConfiguration config);
ManagedObject getManagedObject();
+
+ Exchange getAlternateExchange();
+
+ void setAlternateExchange(Exchange exchange);
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
Tue Dec 7 12:24:44 2010
@@ -21,17 +21,23 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Map;
-import java.util.HashMap;
-
-
public class AMQQueueFactory
{
+ public static final boolean CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED
= true;//TODO: take from queue configuration
+ public static final AMQShortString DLQ_ROUTING_KEY = new
AMQShortString("dlq");
+ public static final AMQShortString X_QPID_DLQ_ENABLED = new
AMQShortString("x-qpid-dlq-enabled");
+ public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
public static final AMQShortString X_QPID_PRIORITIES = new
AMQShortString("x-qpid-priorities");
private static final AMQShortString QPID_LAST_VALUE_QUEUE = new
AMQShortString ("qpid.last_value_queue");
private static final AMQShortString QPID_LAST_VALUE_QUEUE_KEY = new
AMQShortString("qpid.last_value_queue_key");
@@ -147,7 +153,6 @@ public class AMQQueueFactory
}
}
-
AMQQueue q = null;
if(conflationKey != null)
{
@@ -164,7 +169,8 @@ public class AMQQueueFactory
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
-
q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+ QueueConfiguration qConfig =
virtualHost.getConfiguration().getQueueConfiguration(name.asString());
+ q.configure(qConfig);
if(arguments != null)
{
@@ -177,6 +183,74 @@ public class AMQQueueFactory
}
}
+ boolean dlqArgPresent = (arguments != null &&
(arguments.containsKey(X_QPID_DLQ_ENABLED)));
+
+ if(dlqArgPresent || CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED)
+ {
+ //verify that the argument isn't explicitly disabling DLQ for this
queue.
+ boolean dlqEnabled = true;
+ if(dlqArgPresent)
+ {
+ dlqEnabled = arguments.getBoolean(X_QPID_DLQ_ENABLED);
+ }
+
+ //feature is not to be enabled for temporary queues or when
explicitly disabled by argument
+ if(!q.isAutoDelete() && dlqEnabled)
+ {
+ ServerConfiguration serverConfig =
ApplicationRegistry.getInstance().getConfiguration();
+ AMQShortString dlExchangeName = new AMQShortString(name +
serverConfig.getDeadLetterExchangeSuffix());
+ AMQShortString dlQueueName = new AMQShortString(name +
serverConfig.getDeadLetterQueueSuffix());
+
+ ExchangeRegistry exchangeRegistry =
virtualHost.getExchangeRegistry();
+ ExchangeFactory exchangeFactory =
virtualHost.getExchangeFactory();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ Exchange dlExchange = null;
+ synchronized(exchangeRegistry)
+ {
+ dlExchange = exchangeRegistry.getExchange(dlExchangeName);
+
+ if(dlExchange == null)
+ {
+ dlExchange =
exchangeFactory.createExchange(dlExchangeName,
+ ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true,
false, 0);
+
+ exchangeRegistry.registerExchange(dlExchange);
+
+ //enter the dle in the persistent store
+
virtualHost.getMessageStore().createExchange(dlExchange);
+ }
+ }
+
+ AMQQueue dlQueue = null;
+ synchronized(queueRegistry)
+ {
+ dlQueue = queueRegistry.getQueue(dlQueueName);
+
+ if(dlQueue == null)
+ {
+ //set args to disable DLQ'ing from the DLQ itself,
preventing loops etc
+ FieldTable args = new FieldTable();
+ args.setBoolean(X_QPID_DLQ_ENABLED, false);
+
+ dlQueue = createAMQQueueImpl(dlQueueName, true, owner,
false, virtualHost, args);
+
+ //enter the dlq in the persistent store
+ virtualHost.getMessageStore().createQueue(dlQueue,
args);
+ }
+ }
+
+ //ensure the queue is bound to the exchange
+ if(!dlExchange.isBound(DLQ_ROUTING_KEY, dlQueue))
+ {
+ dlQueue.bind(dlExchange, DLQ_ROUTING_KEY, null);
+ }
+
+ q.setAlternateExchange(dlExchange);
+ }
+
+ }
+
return q;
}
@@ -212,6 +286,15 @@ public class AMQQueueFactory
arguments.setString(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey()
== null ? QPID_LVQ_KEY : config.getLVQKey());
}
+ if (!config.getAutoDelete() &&
CONSTANT_THAT_NEEDS_REPLACED_IS_DLQ_CONFIGURED)
+ {
+ if(arguments == null)
+ {
+ arguments = new FieldTable();
+ }
+
+ arguments.setBoolean(X_QPID_DLQ_ENABLED, true);
+ }
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete,
host, arguments);
q.configure(config);
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java?rev=1042999&view=auto
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
(added)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessage.java
Tue Dec 7 12:24:44 2010
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.ArrayList;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+public interface InboundMessage extends Filterable<RuntimeException>
+{
+ AMQShortString getRoutingKey() throws AMQException;
+
+ Long getMessageId();
+
+ void enqueue(ArrayList<AMQQueue> queues);
+}
\ No newline at end of file
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
Tue Dec 7 12:24:44 2010
@@ -35,9 +35,8 @@ import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
-public class IncomingMessage implements Filterable<RuntimeException>
+public class IncomingMessage implements InboundMessage
{
/** Used for debugging purposes. */
@@ -279,7 +278,6 @@ public class IncomingMessage implements
return _contentHeaderBody;
}
-
public boolean isPersistent()
{
return getContentHeaderBody().properties instanceof
BasicContentHeaderProperties &&
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Tue Dec 7 12:24:44 2010
@@ -171,7 +171,7 @@ public interface QueueEntry extends Comp
boolean isRejectedBy(Subscription subscription);
- void requeue(StoreContext storeContext) throws AMQException;
+ void requeue() throws AMQException;
void dequeue(final StoreContext storeContext) throws
FailedDequeueException;
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Tue Dec 7 12:24:44 2010
@@ -260,9 +260,9 @@ public class QueueEntryImpl implements Q
}
- public void requeue(final StoreContext storeContext) throws AMQException
+ public void requeue() throws AMQException
{
- getQueue().requeue(storeContext, this);
+ getQueue().requeue(this);
if(_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED,
QueueEntry.State.AVAILABLE);
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Tue Dec 7 12:24:44 2010
@@ -129,6 +129,8 @@ public class SimpleAMQQueue implements A
private long _flowResumeCapacity =
ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
private final AtomicBoolean _overfull = new AtomicBoolean(false);
+ private Exchange _alternateExchange;
+
protected SimpleAMQQueue(AMQShortString name, boolean durable,
AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -554,7 +556,7 @@ public class SimpleAMQQueue implements A
}
}
- public void requeue(StoreContext storeContext, QueueEntry entry) throws
AMQException
+ public void requeue(QueueEntry entry) throws AMQException
{
SubscriptionList.SubscriptionNodeIterator subscriberIter =
_subscriptionList.iterator();
@@ -1812,4 +1814,14 @@ public class SimpleAMQQueue implements A
{
return String.valueOf(getName());
}
+
+ public Exchange getAlternateExchange()
+ {
+ return _alternateExchange;
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+ _alternateExchange = exchange;
+ }
}
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java?rev=1042999&view=auto
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
(added)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DLQTransactionalContext.java
Tue Dec 7 12:24:44 2010
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+
+/** A transactional context that only supports local transactions
+ * for use in DeadLetterQueue processing */
+public class DLQTransactionalContext extends LocalTransactionalContext
+{
+ private final StoreContext _storeContext;
+
+ public DLQTransactionalContext(final AMQChannel channel, final
StoreContext storeContext)
+ {
+ super(channel);
+ _storeContext = storeContext;
+ }
+
+ @Override
+ public void deliver(final AMQQueue queue, AMQMessage message) throws
AMQException
+ {
+ //TODO: ensure message is not Immediate. Copy the message object if
necessary.
+ deliver(queue, message, true);
+ }
+
+ @Override
+ public StoreContext getStoreContext()
+ {
+ return _storeContext;
+ }
+}
\ No newline at end of file
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
Tue Dec 7 12:24:44 2010
@@ -76,7 +76,7 @@ public class LocalTransactionalContext i
public void process() throws AMQException
{
- entry.requeue(getStoreContext());
+ entry.requeue();
}
}
@@ -84,11 +84,13 @@ public class LocalTransactionalContext i
{
private final AMQQueue _queue;
private final AMQMessage _message;
+ private final boolean _enqueueOnly;
- public PublishAction(final AMQQueue queue, final AMQMessage message)
+ public PublishAction(final AMQQueue queue, final AMQMessage message,
boolean enqueueOnly)
{
_queue = queue;
_message = message;
+ _enqueueOnly = enqueueOnly;
}
public void process() throws AMQException
@@ -98,11 +100,14 @@ public class LocalTransactionalContext i
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
- _queue.checkCapacity(_channel);
-
- if(entry.immediateAndNotDelivered())
+ if(!_enqueueOnly)
{
- getReturnMessages().add(new
NoConsumersException(_message));
+ _queue.checkCapacity(_channel);
+
+ if(entry.immediateAndNotDelivered())
+ {
+ getReturnMessages().add(new
NoConsumersException(_message));
+ }
}
}
finally
@@ -151,24 +156,26 @@ public class LocalTransactionalContext i
public void deliver(final AMQQueue queue, AMQMessage message) throws
AMQException
{
+ deliver(queue, message, false);
+ }
+
+ protected void deliver(final AMQQueue queue, AMQMessage message, boolean
enqueueOnly) throws AMQException
+ {
// A publication will result in the enlisting of several
// TxnOps. The first is an op that will store the message.
// Following that (and ordering is important), an op will
// be added for every queue onto which the message is
// enqueued.
- _postCommitDeliveryList.add(new PublishAction(queue, message));
+ _postCommitDeliveryList.add(new PublishAction(queue, message,
enqueueOnly));
_messageDelivered = true;
-
}
public void requeue(QueueEntry entry) throws AMQException
{
_postCommitDeliveryList.add(new RequeueAction(entry));
_messageDelivered = true;
-
}
-
private void checkAck(long deliveryTag, UnacknowledgedMessageMap
unacknowledgedMessageMap) throws AMQException
{
if (!unacknowledgedMessageMap.contains(deliveryTag))
@@ -255,10 +262,10 @@ public class LocalTransactionalContext i
if (_ackOp != null)
{
-
+ //there are unacknowledged messages to commit delivery of to the
client
_messageDelivered = true;
_ackOp.consolidate();
- // already enlisted, after commit will reset regardless of outcome
+ // ackOp has already enlisted in the txnBuffer, after commit will
reset regardless of outcome
_ackOp = null;
}
@@ -293,7 +300,7 @@ public class LocalTransactionalContext i
{
if (_log.isDebugEnabled())
{
- _log.debug("Performing post commit delivery");
+ _log.debug("Beginning post commit delivery");
}
try
@@ -306,6 +313,7 @@ public class LocalTransactionalContext i
finally
{
_postCommitDeliveryList.clear();
+ _log.debug("Completed post commit delivery");
}
}
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Tue Dec 7 12:24:44 2010
@@ -115,7 +115,7 @@ public class NonTransactionalContext imp
public void requeue(QueueEntry entry) throws AMQException
{
- entry.requeue(_storeContext);
+ entry.requeue();
}
public void acknowledgeMessage(final long deliveryTag, long
lastDeliveryTag,
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
Tue Dec 7 12:24:44 2010
@@ -349,14 +349,7 @@ public class Show extends AbstractComman
arrival.add("" + msg.getArrivalTime());
- try
- {
- ispersitent.add(msg.isPersistent() ? "true" : "false");
- }
- catch (AMQException e)
- {
- ispersitent.add("n/a");
- }
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
isredelivered.add(msg.isRedelivered() ? "true" : "false");
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Tue Dec 7 12:24:44 2010
@@ -367,7 +367,7 @@ public class AbstractHeadersExchangeTest
return false; //To change body of implemented methods use
File | Settings | File Templates.
}
- public void requeue(StoreContext storeContext) throws
AMQException
+ public void requeue() throws AMQException
{
//To change body of implemented methods use File |
Settings | File Templates.
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Tue Dec 7 12:24:44 2010
@@ -167,7 +167,7 @@ public class MockAMQQueue implements AMQ
return null; //To change body of implemented methods use File |
Settings | File Templates.
}
- public void requeue(StoreContext storeContext, QueueEntry entry) throws
AMQException
+ public void requeue(QueueEntry entry) throws AMQException
{
//To change body of implemented methods use File | Settings | File
Templates.
}
@@ -376,4 +376,14 @@ public class MockAMQQueue implements AMQ
{
return false;
}
+
+ public Exchange getAlternateExchange()
+ {
+ return null;
+ }
+
+ public void setAlternateExchange(Exchange exchange)
+ {
+
+ }
}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1042999&r1=1042998&r2=1042999&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
Tue Dec 7 12:24:44 2010
@@ -167,7 +167,7 @@ public class MockQueueEntry implements Q
}
- public void requeue(StoreContext storeContext) throws AMQException
+ public void requeue() throws AMQException
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]