Added:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java?rev=1631137&view=auto
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java
(added)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java
Sat Oct 11 23:46:39 2014
@@ -0,0 +1,1484 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.AccessRequestOkBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.BasicGetEmptyBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MessagePublishInfo;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class ChannelMethodProcessorImpl implements ChannelMethodProcessor
+{
+ private static final Logger _logger =
Logger.getLogger(ChannelMethodProcessorImpl.class);
+
+ private final AMQChannel _channel;
+ private final AMQProtocolEngine _connection;
+
+ public ChannelMethodProcessorImpl(final AMQChannel channel)
+ {
+ _channel = channel;
+ _connection = _channel.getConnection();
+ }
+
+ @Override
+ public void receiveAccessRequest(final AMQShortString realm,
+ final boolean exclusive,
+ final boolean passive,
+ final boolean active,
+ final boolean write,
+ final boolean read)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()))
+ {
+ closeConnection(AMQConstant.COMMAND_INVALID,
+ "AccessRequest not present in AMQP versions other
than 0-8, 0-9");
+ }
+ else
+ {
+ // We don't implement access control class, but to keep clients
happy that expect it
+ // always use the "0" ticket.
+ AccessRequestOkBody response =
methodRegistry.createAccessRequestOkBody(0);
+ _channel.sync();
+ _connection.writeFrame(response.generateFrame(getChannelId()));
+ }
+ }
+
+ @Override
+ public void receiveBasicAck(final long deliveryTag, final boolean multiple)
+ {
+ _channel.acknowledgeMessage(deliveryTag, multiple);
+ }
+
+ @Override
+ public void receiveBasicCancel(final AMQShortString consumerTag, final
boolean nowait)
+ {
+ _channel.unsubscribeConsumer(consumerTag);
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ BasicCancelOkBody cancelOkBody =
methodRegistry.createBasicCancelOkBody(consumerTag);
+ _channel.sync();
+ _connection.writeFrame(cancelOkBody.generateFrame(getChannelId()));
+ }
+ }
+
+ @Override
+ public void receiveBasicConsume(final AMQShortString queueNameStr,
+ AMQShortString consumerTag,
+ final boolean noLocal,
+ final boolean noAck,
+ final boolean exclusive,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost();
+ _channel.sync();
+ String queueName = queueNameStr == null ? null :
queueNameStr.asString();
+
+ MessageSource queue = queueName == null ? _channel.getDefaultQueue() :
vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if (queue != null)
+ {
+ sources.add(queue);
+ }
+ else if (vHost.getContextValue(Boolean.class,
"qpid.enableMultiQueueConsumers")
+ && arguments != null
+ && arguments.get("x-multiqueue") instanceof Collection)
+ {
+ for (Object object : (Collection<Object>)
arguments.get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if (sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if (source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = arguments.get("x-multiqueue").toString();
+ }
+
+ if (sources.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No queue for '" + queueName + "'");
+ }
+ if (queueName != null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" +
queueName + "'");
+ }
+ else
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED, "No queue name
provided, no default queue defined.");
+ }
+ }
+ else
+ {
+ try
+ {
+ consumerTag = _channel.consumeFromSource(consumerTag,
+ sources,
+ !noAck,
+ arguments,
+ exclusive,
+ noLocal);
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ AMQMethodBody responseBody =
methodRegistry.createBasicConsumeOkBody(consumerTag);
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+ }
+ catch (ConsumerTagInUseException cte)
+ {
+
+ closeConnection(AMQConstant.NOT_ALLOWED, "Non-unique consumer
tag, '" + consumerTag + "'");
+ }
+ catch (AMQInvalidArgumentException ise)
+ {
+ closeConnection(AMQConstant.ARGUMENT_INVALID,
ise.getMessage());
+
+
+ }
+ catch (AMQQueue.ExistingExclusiveConsumer e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe
to queue "
+ + queue.getName()
+ + " as it already
has an existing exclusive consumer");
+
+ }
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe
to queue "
+ + queue.getName()
+ + " exclusively as
it already has a consumer");
+
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe
to queue "
+ + queue.getName()
+ + " permission
denied");
+
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe
to queue "
+ + queue.getName()
+ + " as it already
has an incompatible exclusivity policy");
+
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveBasicGet(final AMQShortString queueName, final boolean
noAck)
+ {
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+ _channel.sync();
+ AMQQueue queue =
+ queueName == null ? _channel.getDefaultQueue() :
vHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ _logger.info("No queue for '" + queueName + "'");
+ if (queueName != null)
+ {
+ closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" +
queueName + "'");
+
+ }
+ else
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED, "No queue name
provided, no default queue defined.");
+
+ }
+ }
+ else
+ {
+
+ try
+ {
+ if (!performGet(queue, _connection, _channel, !noAck))
+ {
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+
+ BasicGetEmptyBody responseBody =
methodRegistry.createBasicGetEmptyBody(null);
+
+
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an
exclusive consumer");
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ closeConnection(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an
exclusive consumer, " +
+ "this is likely due to a programming error in
the Qpid broker");
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an
incompatible exclusivity policy");
+ }
+ }
+ }
+
+ @Override
+ public void receiveBasicPublish(final AMQShortString exchangeName,
+ final AMQShortString routingKey,
+ final boolean mandatory,
+ final boolean immediate)
+ {
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ MessageDestination destination;
+
+ if (isDefaultExchange(exchangeName))
+ {
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
+ }
+
+ // if the exchange does not exist we raise a channel exception
+ if (destination == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " +
exchangeName);
+ }
+ else
+ {
+
+ MessagePublishInfo info = new MessagePublishInfo(exchangeName,
+ immediate,
+ mandatory,
+ routingKey);
+
+ try
+ {
+ _channel.setPublishFrame(info, destination);
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
+
+ }
+ }
+ }
+
+ @Override
+ public void receiveBasicQos(final long prefetchSize, final int
prefetchCount, final boolean global)
+ {
+ _channel.sync();
+ _channel.setCredit(prefetchSize, prefetchCount);
+
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveBasicRecover(final boolean requeue, final boolean sync)
+ {
+ _channel.resend();
+
+ if (sync)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody recoverOk =
methodRegistry.createBasicRecoverSyncOkBody();
+ _channel.sync();
+ _connection.writeFrame(recoverOk.generateFrame(getChannelId()));
+
+ }
+
+ }
+
+ @Override
+ public void receiveBasicReject(final long deliveryTag, final boolean
requeue)
+ {
+ MessageInstance message =
_channel.getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:"
+ deliveryTag);
+ }
+ else
+ {
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message has already been purged, unable to
Reject.");
+ }
+ else
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" +
message.getMessage() +
+ ": Requeue:" + requeue +
+ " on channel:" + _channel.debugIdentity());
+ }
+
+ if (requeue)
+ {
+ //this requeue represents a message rejected from the
pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
+
+ _channel.requeue(deliveryTag);
+ }
+ else
+ {
+ // Since the Java client abuses the reject flag for
requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled =
_channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: "
+ + maxDeliveryCountEnabled
+ + " deliveryTag "
+ + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes =
_channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: "
+ + deliveredTooManyTimes
+ + " deliveryTag "
+ + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ _channel.deadLetter(deliveryTag);
+ }
+ else
+ {
+ //this requeue represents a message rejected
because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject
command to resend from the unacked map
+ //and therefore need to increment the delivery
counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ _channel.requeue(deliveryTag);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveChannelClose()
+ {
+ _channel.sync();
+ _connection.closeChannel(_channel);
+
+ _connection.writeFrame(new AMQFrame(_channel.getChannelId(),
+
_connection.getMethodRegistry().createChannelCloseOkBody()));
+ }
+
+ @Override
+ public void receiveChannelCloseOk()
+ {
+ _connection.closeChannelOk(getChannelId());
+ }
+
+ @Override
+ public void receiveChannelFlow(final boolean active)
+ {
+ _channel.sync();
+ _channel.setSuspended(!active);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody =
methodRegistry.createChannelFlowOkBody(active);
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveExchangeBound(final AMQShortString exchangeName,
+ final AMQShortString queueName,
+ final AMQShortString routingKey)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ _channel.sync();
+
+ int replyCode;
+ String replyText;
+
+ if (isDefaultExchange(exchangeName))
+ {
+ if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ replyCode = virtualHost.getQueues().isEmpty()
+ ? ExchangeBoundOkBody.NO_BINDINGS
+ : ExchangeBoundOkBody.OK;
+ replyText = null;
+
+ }
+ else
+ {
+ AMQQueue queue =
virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ }
+ }
+ else
+ {
+ if (queueName == null)
+ {
+ replyCode = virtualHost.getQueue(routingKey.toString()) ==
null
+ ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK
+ : ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ AMQQueue queue =
virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ replyCode = queueName.equals(routingKey)
+ ? ExchangeBoundOkBody.OK
+ :
ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+ replyText = null;
+ }
+ }
+ }
+ }
+ else
+ {
+ ExchangeImpl exchange =
virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+
+ replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND;
+ replyText = "Exchange '" + exchangeName + "' not found";
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.NO_BINDINGS;
+ replyText = null;
+ }
+ }
+ else
+ {
+
+ AMQQueue queue =
virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND;
+ replyText = "Queue '"
+ + queueName
+ + "' not bound to exchange '"
+ + exchangeName
+ + "'";
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ String bindingKey = routingKey == null ? null :
routingKey.asString();
+ if (exchange.isBound(bindingKey, queue))
+ {
+
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode =
ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+ replyText = "Queue '" + queueName + "' not bound with
routing key '" +
+ routingKey + "' to exchange '" +
exchangeName + "'";
+
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(routingKey == null ? "" :
routingKey.asString()))
+ {
+
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK;
+ replyText =
+ "No queue bound with routing key '" + routingKey +
"' to exchange '" + exchangeName + "'";
+ }
+ }
+ }
+
+ ExchangeBoundOkBody exchangeBoundOkBody =
+ methodRegistry.createExchangeBoundOkBody(replyCode,
AMQShortString.validValueOf(replyText));
+
+
_connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveExchangeDeclare(final AMQShortString exchangeName,
+ final AMQShortString type,
+ final boolean passive,
+ final boolean durable,
+ final boolean autoDelete,
+ final boolean internal,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ ExchangeImpl exchange;
+ VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost();
+ if (isDefaultExchange(exchangeName))
+ {
+ if (!new
AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare
default exchange: "
+ + " of type "
+ +
ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ + " to " + type +
".");
+ }
+ else if (!nowait)
+ {
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ AMQMethodBody responseBody =
methodRegistry.createExchangeDeclareOkBody();
+ _channel.sync();
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+
+ }
+ else
+ {
+ if (passive)
+ {
+ exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: " +
exchangeName);
+ }
+ else if (!(type == null || type.length() == 0) &&
!exchange.getType().equals(type.asString()))
+ {
+
+ closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to
redeclare exchange: "
+ +
+ exchangeName
+ + " of type "
+ +
exchange.getType()
+ + " to "
+ + type
+ + ".");
+ }
+ else if (!nowait)
+ {
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ AMQMethodBody responseBody =
methodRegistry.createExchangeDeclareOkBody();
+ _channel.sync();
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+
+ }
+ else
+ {
+ try
+ {
+ String name = exchangeName == null ? null :
exchangeName.intern().toString();
+ String typeString = type == null ? null :
type.intern().toString();
+
+ Map<String, Object> attributes = new HashMap<String,
Object>();
+ if (arguments != null)
+ {
+ attributes.putAll(FieldTable.convertToMap(arguments));
+ }
+ attributes.put(org.apache.qpid.server.model.Exchange.ID,
null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME,
name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE,
typeString);
+
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
+
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ autoDelete ?
LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ if
(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
+ {
+
attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ }
+ exchange = virtualHost.createExchange(attributes);
+
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ AMQMethodBody responseBody =
methodRegistry.createExchangeDeclareOkBody();
+ _channel.sync();
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+
+ }
+ catch (ReservedExchangeNameException e)
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to
declare exchange: " + exchangeName +
+ " which begins
with reserved prefix.");
+
+
+ }
+ catch (ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if (!new AMQShortString(exchange.getType()).equals(type))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to
redeclare exchange: "
+ +
exchangeName + " of type "
+ +
exchange.getType()
+ + " to " +
type + ".");
+
+ }
+ }
+ catch (NoFactoryForTypeException e)
+ {
+ closeConnection(AMQConstant.COMMAND_INVALID, "Unknown
exchange type '"
+ + e.getType()
+ + "' for
exchange '"
+ + exchangeName
+ + "'");
+
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED,
e.getMessage());
+
+ }
+ catch (UnknownConfiguredObjectException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this
exception should never occur
+ final String message = "Unknown alternate exchange "
+ + (e.getName() != null
+ ? "name: \"" + e.getName() + "\""
+ : "id: " + e.getId());
+ closeConnection(AMQConstant.NOT_FOUND, message);
+
+ }
+ catch (IllegalArgumentException e)
+ {
+ closeConnection(AMQConstant.COMMAND_INVALID, "Error
creating exchange '"
+ + exchangeName
+ + "': "
+ +
e.getMessage());
+
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void receiveExchangeDelete(final AMQShortString exchangeStr, final
boolean ifUnused, final boolean nowait)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ _channel.sync();
+ try
+ {
+
+ if (isDefaultExchange(exchangeStr))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED,
+ "Default Exchange cannot be deleted");
+
+ }
+
+ else
+ {
+ final String exchangeName = exchangeStr.toString();
+
+ final ExchangeImpl exchange =
virtualHost.getExchange(exchangeName);
+ if (exchange == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such exchange: " +
exchangeStr);
+ }
+ else
+ {
+ virtualHost.removeExchange(exchange, !ifUnused);
+
+ ExchangeDeleteOkBody responseBody =
_connection.getMethodRegistry().createExchangeDeleteOkBody();
+
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ }
+ }
+ catch (ExchangeIsAlternateException e)
+ {
+ closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an
alternate exchange");
+ }
+ catch (RequiredExchangeException e)
+ {
+ closeChannel(AMQConstant.NOT_ALLOWED, "Exchange '" + exchangeStr +
"' cannot be deleted");
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
+ }
+
+ @Override
+ public void receiveQueueBind(final AMQShortString queueName,
+ final AMQShortString exchange,
+ AMQShortString routingKey,
+ final boolean nowait,
+ final FieldTable argumentsTable)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQQueue<?> queue;
+ if (queueName == null)
+ {
+
+ queue = _channel.getDefaultQueue();
+
+ if (queue != null)
+ {
+ if (routingKey == null)
+ {
+ routingKey = AMQShortString.valueOf(queue.getName());
+ }
+ else
+ {
+ routingKey = routingKey.intern();
+ }
+ }
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ routingKey = routingKey == null ? AMQShortString.EMPTY_STRING :
routingKey.intern();
+ }
+
+ if (queue == null)
+ {
+ String message = queueName == null
+ ? "No default queue defined on channel and queue was null"
+ : "Queue " + queueName + " does not exist.";
+ closeChannel(AMQConstant.NOT_FOUND, message);
+ }
+ else if (isDefaultExchange(exchange))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED,
+ "Cannot bind the queue " + queueName + " to the
default exchange"
+ );
+
+ }
+ else
+ {
+
+ final String exchangeName = exchange.toString();
+
+ final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+ if (exch == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchangeName
+ " does not exist.");
+ }
+ else
+ {
+
+ try
+ {
+
+ Map<String, Object> arguments =
FieldTable.convertToMap(argumentsTable);
+ String bindingKey = String.valueOf(routingKey);
+
+ if (!exch.isBound(bindingKey, arguments, queue))
+ {
+
+ if (!exch.addBinding(bindingKey, queue, arguments)
+ && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(
+ exch.getType()))
+ {
+ exch.replaceBinding(bindingKey, queue, arguments);
+ }
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue "
+ + queue
+ + " to exchange "
+ + exch
+ + " with routing key "
+ + routingKey);
+ }
+ if (!nowait)
+ {
+ _channel.sync();
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ AMQMethodBody responseBody =
methodRegistry.createQueueBindOkBody();
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED,
e.getMessage());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveQueueDeclare(final AMQShortString queueStr,
+ final boolean passive,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ final AMQShortString queueName;
+
+ // if we aren't given a queue name, we create one which we return to
the client
+ if ((queueStr == null) || (queueStr.length() == 0))
+ {
+ queueName = new AMQShortString("tmp_" + UUID.randomUUID());
+ }
+ else
+ {
+ queueName = queueStr.intern();
+ }
+
+ AMQQueue queue;
+
+ //TODO: do we need to check that the queue already exists with exactly
the same "configuration"?
+
+
+ if (passive)
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND,
+ "Queue: " + queueName + " not found on
VirtualHost(" + virtualHost + ").");
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(_channel))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this
Connection.");
+ }
+ else
+ {
+ //set this as the default queue on the channel:
+ _channel.setDefaultQueue(queue);
+ if (!nowait)
+ {
+ _channel.sync();
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+
methodRegistry.createQueueDeclareOkBody(queueName,
+
queue.getQueueDepthMessages(),
+
queue.getConsumerCount());
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared
successfully");
+ }
+ }
+ }
+ }
+ else
+ {
+
+ try
+ {
+ Map<String, Object> attributes =
+
QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments));
+ final String queueNameString =
AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.DURABLE, durable);
+
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
+
+ if (exclusive)
+ {
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT :
LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER
: ExclusivityPolicy.CONNECTION;
+ }
+ else
+ {
+ lifetimePolicy = autoDelete ?
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
+ }
+
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+ queue = virtualHost.createQueue(attributes);
+
+ _channel.setDefaultQueue(queue);
+
+ if (!nowait)
+ {
+ _channel.sync();
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+
queue.getQueueDepthMessages(),
+
queue.getConsumerCount());
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared
successfully");
+ }
+ }
+ catch (QueueExistsException qe)
+ {
+
+ queue = qe.getExistingQueue();
+
+ if (!queue.verifySessionAccess(_channel))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this
Connection.");
+
+ }
+ else if (queue.isExclusive() != exclusive)
+ {
+
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different exclusivity (was: "
+ + queue.isExclusive()
+ + " requested "
+ + exclusive
+ + ")");
+ }
+ else if ((autoDelete
+ && queue.getLifetimePolicy() !=
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!autoDelete && queue.getLifetimePolicy() !=
((exclusive
+ &&
!durable)
+ ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ : LifetimePolicy.PERMANENT)))
+ {
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy()
+ + " requested autodelete: "
+ + autoDelete
+ + ")");
+ }
+ else if (queue.isDurable() != durable)
+ {
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different durability (was: "
+ + queue.isDurable()
+ + " requested "
+ + durable
+ + ")");
+ }
+ else
+ {
+ _channel.setDefaultQueue(queue);
+ if (!nowait)
+ {
+ _channel.sync();
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+
methodRegistry.createQueueDeclareOkBody(queueName,
+
queue.getQueueDepthMessages(),
+
queue.getConsumerCount());
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared
successfully");
+ }
+ }
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveQueueDelete(final AMQShortString queueName,
+ final boolean ifUnused,
+ final boolean ifEmpty,
+ final boolean nowait)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ _channel.sync();
+ AMQQueue queue;
+ if (queueName == null)
+ {
+
+ //get the default queue on the channel:
+ queue = _channel.getDefaultQueue();
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ }
+
+ if (queue == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does
not exist.");
+
+ }
+ else
+ {
+ if (ifEmpty && !queue.isEmpty())
+ {
+ closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is
not empty.");
+ }
+ else if (ifUnused && !queue.isUnused())
+ {
+ // TODO - Error code
+ closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is
still used.");
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(_channel))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this
Connection.");
+
+ }
+ else
+ {
+ int purged = 0;
+ try
+ {
+ purged = virtualHost.removeQueue(queue);
+
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ QueueDeleteOkBody responseBody =
methodRegistry.createQueueDeleteOkBody(purged);
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED,
e.getMessage());
+
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveQueuePurge(final AMQShortString queueName, final
boolean nowait)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQQueue queue = null;
+ if (queueName == null && (queue = _channel.getDefaultQueue()) == null)
+ {
+
+ closeConnection(AMQConstant.NOT_ALLOWED, "No queue specified.");
+ }
+ else if ((queueName != null) && (queue =
virtualHost.getQueue(queueName.toString())) == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does
not exist.");
+ }
+ else if (!queue.verifySessionAccess(_channel))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this
Connection."
+ );
+ }
+ else
+ {
+ try
+ {
+ long purged = queue.clearQueue();
+ if (!nowait)
+ {
+ _channel.sync();
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ AMQMethodBody responseBody =
methodRegistry.createQueuePurgeOkBody(purged);
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage());
+
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveQueueUnbind(final AMQShortString queueName,
+ final AMQShortString exchange,
+ AMQShortString routingKey,
+ final FieldTable arguments)
+ {
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+
+
+ final boolean useDefaultQueue = queueName == null;
+ final AMQQueue queue = useDefaultQueue
+ ? _channel.getDefaultQueue()
+ : virtualHost.getQueue(queueName.toString());
+
+
+ if (queue == null)
+ {
+ String message = useDefaultQueue
+ ? "No default queue defined on channel and queue was null"
+ : "Queue " + queueName + " does not exist.";
+ closeChannel(AMQConstant.NOT_FOUND, message);
+ }
+ else if (isDefaultExchange(exchange))
+ {
+ closeConnection(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue "
+ + queue.getName()
+ + " from the default
exchange");
+
+ }
+ else
+ {
+
+ final ExchangeImpl exch =
virtualHost.getExchange(exchange.toString());
+
+ if (exch == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchange + "
does not exist.");
+ }
+ else if (!exch.hasBinding(String.valueOf(routingKey), queue))
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such binding");
+ }
+ else
+ {
+ try
+ {
+ exch.deleteBinding(String.valueOf(routingKey), queue);
+
+ final AMQMethodBody responseBody =
_connection.getMethodRegistry().createQueueUnbindOkBody();
+ _channel.sync();
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ catch (AccessControlException e)
+ {
+ closeConnection(AMQConstant.ACCESS_REFUSED,
e.getMessage());
+
+ }
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveTxSelect()
+ {
+ _channel.setLocalTransactional();
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveTxCommit()
+ {
+ if (!_channel.isTransactional())
+ {
+ closeChannel(AMQConstant.COMMAND_INVALID, "Fatal error: commit
called on non-transactional channel");
+ }
+ _channel.commit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ MethodRegistry methodRegistry =
_connection.getMethodRegistry();
+ AMQMethodBody responseBody =
methodRegistry.createTxCommitOkBody();
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ }, true);
+
+ }
+
+ @Override
+ public void receiveTxRollback()
+ {
+ if (!_channel.isTransactional())
+ {
+ closeChannel(AMQConstant.COMMAND_INVALID, "Fatal error: rollback
called on non-transactional channel");
+ }
+
+ final MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ final AMQMethodBody responseBody =
methodRegistry.createTxRollbackOkBody();
+
+ Runnable task = new Runnable()
+ {
+
+ public void run()
+ {
+
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ };
+
+ _channel.rollback(task);
+
+ //Now resend all the unacknowledged messages back to the original
subscribers.
+ //(Must be done after the TxnRollback-ok response).
+ // Why, are we not allowed to send messages back to client before the
ok method?
+ _channel.resend();
+ }
+
+ private void closeChannel(final AMQConstant cause, final String message)
+ {
+ _connection.closeChannelAndWriteFrame(_channel, cause, message);
+ }
+
+ private void closeConnection(final AMQConstant cause, final String message)
+ {
+ _connection.closeConnection(cause, message, getChannelId());
+ }
+
+ private int getChannelId()
+ {
+ return _channel.getChannelId();
+ }
+
+ private boolean isDefaultExchange(final AMQShortString exchangeName)
+ {
+ return exchangeName == null ||
AMQShortString.EMPTY_STRING.equals(exchangeName);
+ }
+
+ public static boolean performGet(final AMQQueue queue,
+ final AMQProtocolEngine connection,
+ final AMQChannel channel,
+ final boolean acks)
+ throws MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer,
MessageSource.ConsumerAccessRefused
+ {
+
+ final FlowCreditManager singleMessageCredit = new
MessageOnlyCreditManager(1L);
+
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, connection,
channel, queue);
+ final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final ConsumerImpl sub,
+ final MessageInstance entry,
+ final long deliveryTag)
+ {
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
+ }
+ };
+
+ ConsumerTarget_0_8 target;
+ EnumSet<ConsumerImpl.Option> options =
EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+
ConsumerImpl.Option.SEES_REQUEUES);
+ if (acks)
+ {
+
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+
AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit,
getDeliveryMethod, getRecordMethod);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
+
AMQShortString.EMPTY_STRING, null,
+
singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+
+ ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class,
"", options);
+ sub.flush();
+ sub.close();
+ return getDeliveryMethod.hasDeliveredMessage();
+
+
+ }
+
+
+ private static class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQProtocolEngine _connection;
+ private final AMQChannel _channel;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQProtocolEngine connection,
+ final AMQChannel channel, final AMQQueue
queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _connection = connection;
+ _channel = channel;
+ _queue = queue;
+ }
+
+ @Override
+ public long deliverToClient(final ConsumerImpl sub, final
ServerMessage message,
+ final InstanceProperties props, final long
deliveryTag)
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ long size =
_connection.getProtocolOutputConverter().writeGetOk(message,
+
props,
+
_channel.getChannelId(),
+
deliveryTag,
+
_queue.getQueueDepthMessages());
+
+ _deliveredMessage = true;
+ return size;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
+
+}
Propchange:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java?rev=1631137&view=auto
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java
(added)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java
Sat Oct 11 23:46:39 2014
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+public class ConsumerTagInUseException extends Exception
+{
+ public ConsumerTagInUseException(final String message)
+ {
+ super(message);
+ }
+}
Propchange:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Sat Oct 11 23:46:39 2014
@@ -79,7 +79,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
AMQShortString
consumerTag, FieldTable filters,
- FlowCreditManager
creditManager) throws AMQException
+ FlowCreditManager
creditManager)
{
return new BrowserConsumer(channel, consumerTag, filters,
creditManager, channel.getClientDeliveryMethod(),
channel.getRecordDeliveryMethod());
}
@@ -89,7 +89,7 @@ public abstract class ConsumerTarget_0_8
final FieldTable
filters,
final
FlowCreditManager creditManager,
final
ClientDeliveryMethod deliveryMethod,
- final
RecordDeliveryMethod recordMethod) throws AMQException
+ final
RecordDeliveryMethod recordMethod)
{
return new GetNoAckConsumer(channel, consumerTag, filters,
creditManager, deliveryMethod, recordMethod);
}
@@ -106,7 +106,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(channel, consumerTag,
filters, creditManager, deliveryMethod, recordMethod);
@@ -147,7 +146,7 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
AMQShortString
consumerTag, FieldTable filters,
- FlowCreditManager
creditManager) throws AMQException
+ FlowCreditManager
creditManager)
{
return new NoAckConsumer(channel, consumerTag, filters, creditManager,
channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
@@ -170,7 +169,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(channel, consumerTag, filters, creditManager,
deliveryMethod, recordMethod);
@@ -206,7 +204,7 @@ public abstract class ConsumerTarget_0_8
long size;
synchronized (getChannel())
{
- getChannel().getProtocolSession().setDeferFlush(batch);
+ getChannel().getConnection().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
size = sendToClient(consumer, message, props, deliveryTag);
@@ -248,7 +246,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(channel, consumerTag, filters, creditManager,
deliveryMethod, recordMethod);
}
@@ -264,7 +261,6 @@ public abstract class ConsumerTarget_0_8
public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
AMQShortString
consumerTag, FieldTable filters,
FlowCreditManager
creditManager)
- throws AMQException
{
return new AckConsumer(channel,consumerTag,filters,creditManager,
channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
@@ -275,7 +271,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager
creditManager,
ClientDeliveryMethod
deliveryMethod,
RecordDeliveryMethod
recordMethod)
- throws AMQException
{
return new AckConsumer(channel,consumerTag,filters,creditManager,
deliveryMethod, recordMethod);
}
@@ -287,7 +282,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(channel, consumerTag, filters, creditManager,
deliveryMethod, recordMethod);
}
@@ -307,7 +301,7 @@ public abstract class ConsumerTarget_0_8
synchronized (getChannel())
{
- getChannel().getProtocolSession().setDeferFlush(batch);
+ getChannel().getConnection().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
addUnacknowledgedMessage(entry);
@@ -345,7 +339,6 @@ public abstract class ConsumerTarget_0_8
FlowCreditManager creditManager,
ClientDeliveryMethod deliveryMethod,
RecordDeliveryMethod recordMethod)
- throws AMQException
{
super(State.ACTIVE);
@@ -473,9 +466,9 @@ public abstract class ConsumerTarget_0_8
return _consumerTag;
}
- public AMQProtocolSession getProtocolSession()
+ public AMQProtocolEngine getProtocolSession()
{
- return _channel.getProtocolSession();
+ return _channel.getConnection();
}
public void restoreCredit(final ServerMessage message)
@@ -524,7 +517,7 @@ public abstract class ConsumerTarget_0_8
public void confirmAutoClose()
{
- ProtocolOutputConverter converter =
getChannel().getProtocolSession().getProtocolOutputConverter();
+ ProtocolOutputConverter converter =
getChannel().getConnection().getProtocolOutputConverter();
converter.confirmConsumerAutoClose(getChannel().getChannelId(),
getConsumerTag());
}
@@ -539,9 +532,9 @@ public abstract class ConsumerTarget_0_8
public void flushBatched()
{
- _channel.getProtocolSession().setDeferFlush(false);
+ _channel.getConnection().setDeferFlush(false);
- _channel.getProtocolSession().flushBatched();
+ _channel.getConnection().flushBatched();
}
protected void addUnacknowledgedMessage(MessageInstance entry)
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
Sat Oct 11 23:46:39 2014
@@ -20,14 +20,13 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.Map;
+
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
-import java.util.Map;
-
public class ExtractResendAndRequeue implements
UnacknowledgedMessageMap.Visitor
{
private static final Logger _log =
Logger.getLogger(ExtractResendAndRequeue.class);
@@ -45,7 +44,7 @@ public class ExtractResendAndRequeue imp
_msgToResend = msgToResend;
}
- public boolean callback(final long deliveryTag, MessageInstance message)
throws AMQException
+ public boolean callback(final long deliveryTag, MessageInstance message)
{
message.setRedelivered();
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
---
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
(original)
+++
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
Sat Oct 11 23:46:39 2014
@@ -48,12 +48,12 @@ public class ProtocolOutputConverterImpl
{
private static final int BASIC_CLASS_ID = 60;
- private final AMQProtocolSession _protocolSession;
+ private final AMQProtocolEngine _connection;
private static final AMQShortString GZIP_ENCODING =
AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING);
- public ProtocolOutputConverterImpl(AMQProtocolSession session)
+ public ProtocolOutputConverterImpl(AMQProtocolEngine connection)
{
- _protocolSession = session;
+ _connection = connection;
}
@@ -76,7 +76,7 @@ public class ProtocolOutputConverterImpl
}
else
{
- return getMessageConverter(serverMessage).convert(serverMessage,
_protocolSession.getVirtualHost());
+ return getMessageConverter(serverMessage).convert(serverMessage,
_connection.getVirtualHost());
}
}
@@ -99,7 +99,7 @@ public class ProtocolOutputConverterImpl
byte[] modifiedContent;
// straight through case
- boolean compressionSupported =
_protocolSession.isCompressionSupported();
+ boolean compressionSupported = _connection.isCompressionSupported();
if(msgCompressed && !compressionSupported &&
(modifiedContent =
GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null)
@@ -115,7 +115,7 @@ public class ProtocolOutputConverterImpl
else if(!msgCompressed
&& compressionSupported
&& contentHeaderBody.getProperties().getEncoding()==null
- && bodySize > _protocolSession.getMessageCompressionThreshold()
+ && bodySize > _connection.getMessageCompressionThreshold()
&& (modifiedContent =
GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null)
{
BasicContentHeaderProperties modifiedProps =
@@ -182,7 +182,7 @@ public class ProtocolOutputConverterImpl
}
else
{
- int maxBodySize = (int) _protocolSession.getMaxFrameSize() -
AMQFrame.getFrameOverhead();
+ int maxBodySize = (int) _connection.getMaxFrameSize() -
AMQFrame.getFrameOverhead();
int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
@@ -316,7 +316,7 @@ public class ProtocolOutputConverterImpl
public AMQBody createAMQBody()
{
- return
_protocolSession.getMethodRegistry().createBasicDeliverBody(_consumerTag,
+ return
_connection.getMethodRegistry().createBasicDeliverBody(_consumerTag,
_deliveryTag,
_isRedelivered,
_exchangeName,
@@ -372,7 +372,7 @@ public class ProtocolOutputConverterImpl
final boolean isRedelivered =
Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED));
BasicGetOkBody getOkBody =
-
_protocolSession.getMethodRegistry().createBasicGetOkBody(deliveryTag,
+
_connection.getMethodRegistry().createBasicGetOkBody(deliveryTag,
isRedelivered,
exchangeName,
routingKey,
@@ -387,7 +387,7 @@ public class ProtocolOutputConverterImpl
{
BasicReturnBody basicReturnBody =
-
_protocolSession.getMethodRegistry().createBasicReturnBody(replyCode,
+
_connection.getMethodRegistry().createBasicReturnBody(replyCode,
replyText,
messagePublishInfo.getExchange(),
messagePublishInfo.getRoutingKey());
@@ -407,14 +407,14 @@ public class ProtocolOutputConverterImpl
public void writeFrame(AMQDataBlock block)
{
- _protocolSession.writeFrame(block);
+ _connection.writeFrame(block);
}
public void confirmConsumerAutoClose(int channelId, AMQShortString
consumerTag)
{
- BasicCancelOkBody basicCancelOkBody =
_protocolSession.getMethodRegistry().createBasicCancelOkBody(consumerTag);
+ BasicCancelOkBody basicCancelOkBody =
_connection.getMethodRegistry().createBasicCancelOkBody(consumerTag);
writeFrame(basicCancelOkBody.generateFrame(channelId));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]