Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Jun 2 20:53:49 2015 @@ -70,8 +70,6 @@ import org.apache.qpid.client.util.JMSEx import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ListMessage; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; @@ -530,21 +528,21 @@ public abstract class AMQSession<C exten public void setLegacyFieldsForQueueType(AMQDestination dest) { // legacy support - dest.setQueueName(new AMQShortString(dest.getAddressName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); + dest.setQueueName(dest.getAddressName()); + dest.setExchangeName(""); + dest.setExchangeClass(""); dest.setRoutingKey(dest.getAMQQueueName()); } public void setLegacyFieldsForTopicType(AMQDestination dest) { // legacy support - dest.setExchangeName(new AMQShortString(dest.getAddressName())); + dest.setExchangeName(dest.getAddressName()); Node node = dest.getNode(); - dest.setExchangeClass(node.getExchangeType() == null? - AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): - new AMQShortString(node.getExchangeType())); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); + dest.setExchangeClass(node.getExchangeType() == null + ? ExchangeDefaults.TOPIC_EXCHANGE_CLASS + : node.getExchangeType()); + dest.setRoutingKey(dest.getSubject()); } protected void verifySubject(AMQDestination dest) throws AMQException @@ -552,14 +550,14 @@ public abstract class AMQSession<C exten if (dest.getSubject() == null || dest.getSubject().trim().equals("")) { - if ("topic".equals(dest.getExchangeClass().toString())) + if ("topic".equals(dest.getExchangeClass())) { - dest.setRoutingKey(new AMQShortString("#")); - dest.setSubject(dest.getRoutingKey().toString()); + dest.setRoutingKey("#"); + dest.setSubject(dest.getRoutingKey()); } else { - dest.setRoutingKey(new AMQShortString("")); + dest.setRoutingKey(""); dest.setSubject(""); } } @@ -677,14 +675,14 @@ public abstract class AMQSession<C exten * TODO Be aware of possible changes to parameter order as versions change. * TODO Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ - public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination destination) throws AMQException + public void bindQueue(final String queueName, final String routingKey, final Map<String,Object> arguments, + final String exchangeName, final AMQDestination destination) throws AMQException { bindQueue(queueName, routingKey, arguments, exchangeName, destination, false); } - public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination destination, + public void bindQueue(final String queueName, final String routingKey, final Map<String,Object> arguments, + final String exchangeName, final AMQDestination destination, final boolean nowait) throws AMQException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ @@ -702,12 +700,12 @@ public abstract class AMQSession<C exten { if (consumer.getQueuename() != null) { - bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), new FieldTable(), amqd.getExchangeName(), amqd); + bindQueue(consumer.getQueuename(), routingKey, new HashMap<String, Object>(), amqd.getExchangeName(), amqd); } } - public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, AMQDestination destination, + public abstract void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments, + final String exchangeName, AMQDestination destination, final boolean nowait) throws AMQException, FailoverException; /** @@ -1033,7 +1031,7 @@ public abstract class AMQSession<C exten } public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal, - boolean exclusive, String selector, FieldTable rawSelector) throws JMSException + boolean exclusive, String selector, Map<String,Object> rawSelector) throws JMSException { checkValidDestination(destination); @@ -1086,7 +1084,7 @@ public abstract class AMQSession<C exten if (subscriber == null) { // After the address is resolved routing key will not be null. - AMQShortString topicName = dest.getRoutingKey(); + String topicName = dest.getRoutingKey(); if (_strictAMQP) { @@ -1120,7 +1118,9 @@ public abstract class AMQSession<C exten // says we must trash the subscription. boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); boolean isQueueBoundForTopicAndSelector = - isQueueBound(dest.getExchangeName().asString(), dest.getAMQQueueName().asString(), topicName.asString(), args); + isQueueBound(dest.getExchangeName().toString(), + dest.getAMQQueueName().toString(), + topicName.toString(), args); if (isQueueBound && !isQueueBoundForTopicAndSelector) { @@ -1130,8 +1130,12 @@ public abstract class AMQSession<C exten { try { - bindQueue(dest.getAMQQueueName(), dest.getRoutingKey(), - FieldTable.convertToFieldTable(args), dest.getExchangeName(), dest, true); + bindQueue(dest.getAMQQueueName(), + dest.getRoutingKey(), + args, + dest.getExchangeName(), + dest, + true); } catch(AMQException e) { @@ -1266,7 +1270,7 @@ public abstract class AMQSession<C exten { // For testing we may want to use the prefix return new AMQQueue(getDefaultQueueExchangeName(), - new AMQShortString(AMQDestination.stripSyntaxPrefix(queueName))); + AMQDestination.stripSyntaxPrefix(queueName)); } else { @@ -1301,7 +1305,7 @@ public abstract class AMQSession<C exten * @throws AMQException If the queue cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + public void createQueue(final String name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException { createQueue(name, autoDelete, durable, exclusive, null); @@ -1321,7 +1325,7 @@ public abstract class AMQSession<C exten * @throws AMQException If the queue cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + public void createQueue(final String name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException { new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() @@ -1334,7 +1338,7 @@ public abstract class AMQSession<C exten }, _connection).execute(); } - public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, + public abstract void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException; /** @@ -1498,7 +1502,7 @@ public abstract class AMQSession<C exten createQueue(result.getAMQQueueName(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); bindQueue(result.getAMQQueueName(), result.getRoutingKey(), - new FieldTable(), result.getExchangeName(), result); + new HashMap<String, Object>(), result.getExchangeName(), result); return result; } catch (AMQException e) @@ -1558,7 +1562,7 @@ public abstract class AMQSession<C exten topicName = AMQDestination.stripSyntaxPrefix(topicName); if (syntax == AMQDestination.DestSyntax.BURL) { - return new AMQTopic(getDefaultTopicExchangeName(), new AMQShortString(topicName)); + return new AMQTopic(getDefaultTopicExchangeName(), topicName); } else { @@ -1578,7 +1582,7 @@ public abstract class AMQSession<C exten } } - public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException + public void declareExchange(String name, String type, boolean nowait) throws AMQException { declareExchange(name, type, nowait, false, false, false); } @@ -1620,12 +1624,12 @@ public abstract class AMQSession<C exten return _prefetchHighMark; } - public AMQShortString getDefaultQueueExchangeName() + public String getDefaultQueueExchangeName() { return _connection.getDefaultQueueExchangeName(); } - public AMQShortString getDefaultTopicExchangeName() + public String getDefaultTopicExchangeName() { return _connection.getDefaultTopicExchangeName(); } @@ -1635,12 +1639,12 @@ public abstract class AMQSession<C exten return _messageListener; } - public AMQShortString getTemporaryQueueExchangeName() + public String getTemporaryQueueExchangeName() { return _connection.getTemporaryQueueExchangeName(); } - public AMQShortString getTemporaryTopicExchangeName() + public String getTemporaryTopicExchangeName() { return _connection.getTemporaryTopicExchangeName(); } @@ -1720,16 +1724,16 @@ public abstract class AMQSession<C exten throws AMQException { - declareAndBind(amqd, new FieldTable()); + declareAndBind(amqd, new HashMap<String,Object>()); } - public void declareAndBind(AMQDestination amqd, FieldTable arguments) + public void declareAndBind(AMQDestination amqd, Map<String,Object> arguments) throws AMQException { declareExchange(amqd, false); - AMQShortString queueName = declareQueue(amqd, false); + String queueName = declareQueue(amqd, false); bindQueue(queueName, amqd.getRoutingKey(), arguments, amqd.getExchangeName(), amqd); } @@ -2015,7 +2019,7 @@ public abstract class AMQSession<C exten protected C createConsumerImpl(final Destination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, - final boolean exclusive, String selector, final FieldTable rawSelector, + final boolean exclusive, String selector, final Map<String,Object> rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { checkTemporaryDestination(destination); @@ -2103,9 +2107,14 @@ public abstract class AMQSession<C exten }, _connection).execute(); } - public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments, - final boolean noConsume, final boolean autoClose) throws JMSException; + public abstract C createMessageConsumer(final AMQDestination destination, + final int prefetchHigh, + final int prefetchLow, + final boolean noLocal, + final boolean exclusive, String selector, + final Map<String,Object> arguments, + final boolean noConsume, + final boolean autoClose) throws JMSException; /** * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer @@ -2165,7 +2174,7 @@ public abstract class AMQSession<C exten return _sessionInRecovery; } - boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName) throws JMSException + boolean isQueueBound(String exchangeName, String queueName) throws JMSException { return isQueueBound(exchangeName, queueName, null); } @@ -2184,7 +2193,7 @@ public abstract class AMQSession<C exten * @throws JMSException If the query fails for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + public abstract boolean isQueueBound(final String exchangeName, final String queueName, final String routingKey) throws JMSException; public abstract boolean isQueueBound(final AMQDestination destination) throws JMSException; @@ -2621,7 +2630,7 @@ public abstract class AMQSession<C exten * Register to consume from the queue. * @param queueName */ - private void consumeFromQueue(C consumer, AMQShortString queueName, boolean nowait) throws AMQException, FailoverException + private void consumeFromQueue(C consumer, String queueName, boolean nowait) throws AMQException, FailoverException { int tagId = _nextTag++; @@ -2696,7 +2705,7 @@ public abstract class AMQSession<C exten abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange) throws AMQException; - public abstract void sendConsume(C consumer, AMQShortString queueName, + public abstract void sendConsume(C consumer, String queueName, boolean nowait, int tag) throws AMQException, FailoverException; private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) @@ -2798,9 +2807,9 @@ public abstract class AMQSession<C exten * @throws AMQException If the exchange cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - void declareExchange(final AMQShortString name, final AMQShortString type, - final boolean nowait, final boolean durable, - final boolean autoDelete, final boolean internal) throws AMQException + void declareExchange(final String name, final String type, + final boolean nowait, final boolean durable, + final boolean autoDelete, final boolean internal) throws AMQException { new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { @@ -2812,9 +2821,9 @@ public abstract class AMQSession<C exten }, _connection).execute(); } - void declareExchange(final AMQShortString name, final AMQShortString type, + void declareExchange(final String name, final String type, final boolean nowait, final boolean durable, - final boolean autoDelete, final FieldTable arguments, + final boolean autoDelete, final Map<String,Object> arguments, final boolean passive) throws AMQException { new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() @@ -2827,8 +2836,8 @@ public abstract class AMQSession<C exten }, _connection).execute(); } - protected AMQShortString preprocessAddressTopic(final C consumer, - AMQShortString queueName) throws AMQException + protected String preprocessAddressTopic(final C consumer, + String queueName) throws AMQException { if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) { @@ -2847,16 +2856,16 @@ public abstract class AMQSession<C exten abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException; - public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + public abstract void sendExchangeDeclare(final String name, final String type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; - public abstract void sendExchangeDeclare(final AMQShortString name, - final AMQShortString type, + public abstract void sendExchangeDeclare(final String name, + final String type, final boolean nowait, boolean durable, boolean autoDelete, - FieldTable arguments, + Map<String,Object> arguments, final boolean passive) throws AMQException, FailoverException; /** @@ -2879,21 +2888,21 @@ public abstract class AMQSession<C exten * TODO Verify the destiation is valid or throw an exception. * TODO Be aware of possible changes to parameter order as versions change. */ - protected AMQShortString declareQueue(final AMQDestination amqd, - final boolean noLocal) throws AMQException + protected String declareQueue(final AMQDestination amqd, + final boolean noLocal) throws AMQException { return declareQueue(amqd, noLocal, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, - final boolean noLocal, final boolean nowait) + protected String declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait) throws AMQException { return declareQueue(amqd, noLocal, nowait, false); } - protected abstract AMQShortString declareQueue(final AMQDestination amqd, - final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException; + protected abstract String declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException; /** * Undeclares the specified queue. @@ -2905,7 +2914,7 @@ public abstract class AMQSession<C exten * @throws JMSException If the queue could not be deleted for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - protected void deleteQueue(final AMQShortString queueName) throws JMSException + protected void deleteQueue(final String queueName) throws JMSException { try { @@ -2939,7 +2948,7 @@ public abstract class AMQSession<C exten deleteQueue(amqQueue.getAMQQueueName()); } - public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException; + public abstract void sendQueueDelete(final String queueName) throws AMQException, FailoverException; private long getNextProducerId() { @@ -3030,7 +3039,7 @@ public abstract class AMQSession<C exten } - AMQShortString queueName = amqd.getAMQQueueName(); + String queueName = amqd.getAMQQueueName(); // store the consumer queue name consumer.setQueuename(queueName); @@ -3073,7 +3082,7 @@ public abstract class AMQSession<C exten } } - protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) + protected abstract boolean isBound(String exchangeName, String amqQueueName, String routingKey) throws AMQException; private void registerProducer(long producerId, MessageProducer producer)
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Jun 2 20:53:49 2015 @@ -48,7 +48,6 @@ import org.apache.qpid.client.failover.F import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.message.AMQMessageDelegateFactory; -import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.messaging.address.AddressHelper; @@ -56,8 +55,6 @@ import org.apache.qpid.client.messaging. import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; import org.apache.qpid.util.Serial; @@ -339,29 +336,28 @@ public class AMQSession_0_10 extends AMQ * @param routingKey Specifies the routing key for the binding. * @param arguments 0_8 specific */ - public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName, + @Override + public void sendQueueBind(final String queueName, final String routingKey, + final Map<String,Object> arguments, final String exchangeName, final AMQDestination destination, final boolean nowait) throws AMQException { if (destination == null || destination.getDestSyntax() == DestSyntax.BURL) { - Map args = FieldTableSupport.convertToMap(arguments); - if(destination != null) { - for (AMQShortString rk: destination.getBindingKeys()) + for (String rk: destination.getBindingKeys()) { - doSendQueueBind(queueName, exchangeName, args, rk); + doSendQueueBind(queueName, exchangeName, arguments, rk); } if(!Arrays.asList(destination.getBindingKeys()).contains(routingKey)) { - doSendQueueBind(queueName, exchangeName, args, routingKey); + doSendQueueBind(queueName, exchangeName, arguments, routingKey); } } else { - doSendQueueBind(queueName, exchangeName, args, routingKey); + doSendQueueBind(queueName, exchangeName, arguments, routingKey); } } else @@ -382,12 +378,9 @@ public class AMQSession_0_10 extends AMQ { continue; } - String queue = binding.getQueue() == null? - queueName.asString(): binding.getQueue(); + String queue = binding.getQueue() == null ? queueName : binding.getQueue(); - String exchange = binding.getExchange() == null ? - defaultExchange : - binding.getExchange(); + String exchange = binding.getExchange() == null ? defaultExchange : binding.getExchange(); _logger.debug("Binding queue : " + queue + " exchange: " + exchange + @@ -404,17 +397,17 @@ public class AMQSession_0_10 extends AMQ } } - private void doSendQueueBind(final AMQShortString queueName, - final AMQShortString exchangeName, + private void doSendQueueBind(final String queueName, + final String exchangeName, final Map args, - final AMQShortString rk) + final String rk) { - _logger.debug("Binding queue : " + queueName.toString() + - " exchange: " + exchangeName.toString() + - " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), - exchangeName.toString(), - rk.toString(), + _logger.debug("Binding queue : " + queueName + + " exchange: " + exchangeName + + " using binding key " + rk); + getQpidSession().exchangeBind(queueName, + exchangeName, + rk, args); } @@ -460,10 +453,10 @@ public class AMQSession_0_10 extends AMQ * @throws AMQException * @throws FailoverException */ - public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, + public void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException { - getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE, + getQpidSession().queueDeclare(name, null, arguments, durable ? Option.DURABLE : Option.NONE, autoDelete ? Option.AUTO_DELETE : Option.NONE, exclusive ? Option.EXCLUSIVE : Option.NONE); // We need to sync so that we get notify of an error. @@ -557,7 +550,7 @@ public class AMQSession_0_10 extends AMQ public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, - final FieldTable rawSelector, final boolean noConsume, + final Map<String,Object> rawSelector, final boolean noConsume, final boolean autoClose) throws JMSException { return new BasicMessageConsumer_0_10(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, @@ -569,9 +562,9 @@ public class AMQSession_0_10 extends AMQ * Bind a queue with an exchange. */ - public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + public boolean isQueueBound(final String exchangeName, final String queueName, final String routingKey) { - return isQueueBound(exchangeName,queueName,routingKey,null); + return isQueueBound(exchangeName,queueName,routingKey,(Map<String,Object>)null); } public boolean isQueueBound(final AMQDestination destination) @@ -579,19 +572,19 @@ public class AMQSession_0_10 extends AMQ return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys()); } - public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) + public boolean isQueueBound(final String exchangeName, final String queueName, final String routingKey, String[] bindingKeys) { String rk = null; if (bindingKeys != null && bindingKeys.length>0) { - rk = bindingKeys[0].toString(); + rk = bindingKeys[0]; } else if (routingKey != null) { - rk = routingKey.toString(); + rk = routingKey; } - return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null); + return isQueueBound(exchangeName == null ? null : exchangeName,queueName == null ? null : queueName,rk,(Map<String,Object>)null); } public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args) @@ -621,7 +614,7 @@ public class AMQSession_0_10 extends AMQ } @Override - protected boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) + protected boolean isBound(String exchangeName, String amqQueueName, String routingKey) { return isQueueBound(exchangeName, amqQueueName, routingKey); } @@ -630,7 +623,7 @@ public class AMQSession_0_10 extends AMQ * This method is invoked when a consumer is created * Registers the consumer with the broker */ - public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, + public void sendConsume(BasicMessageConsumer_0_10 consumer, String queueName, boolean nowait, int tag) throws AMQException, FailoverException { @@ -640,7 +633,7 @@ public class AMQSession_0_10 extends AMQ AMQDestination destination = consumer.getDestination(); long capacity = consumer.getCapacity(); - Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + Map<String, Object> arguments = consumer.getArguments(); Link link = destination.getLink(); if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) @@ -650,7 +643,7 @@ public class AMQSession_0_10 extends AMQ boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; - String queue = queueName == null ? destination.getAddressName() : queueName.toString(); + String queue = queueName == null ? destination.getAddressName() : queueName; getQpidSession().messageSubscribe (queue, String.valueOf(tag), acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, @@ -706,18 +699,18 @@ public class AMQSession_0_10 extends AMQ /** * creates an exchange if it does not already exist */ - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + public void sendExchangeDeclare(final String name, final String type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it - sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete); + sendExchangeDeclare(name, type, null, null, nowait, durable, autoDelete); } - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, - boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + public void sendExchangeDeclare(final String name, final String type, final boolean nowait, + boolean durable, boolean autoDelete, Map<String,Object> arguments, final boolean passive) throws AMQException, FailoverException { - sendExchangeDeclare(name.asString(), type.asString(), null, - arguments == null ? null : FieldTableSupport.convertToMap(arguments), + sendExchangeDeclare(name, type, null, + arguments, nowait, durable, autoDelete); } @@ -731,7 +724,7 @@ public class AMQSession_0_10 extends AMQ type, alternateExchange, args, - name.toString().startsWith("amq.") ? Option.PASSIVE : Option.NONE, + name.startsWith("amq.") ? Option.PASSIVE : Option.NONE, durable ? Option.DURABLE : Option.NONE, autoDelete ? Option.AUTO_DELETE : Option.NONE); // We need to sync so that we get notify of an error. @@ -758,15 +751,15 @@ public class AMQSession_0_10 extends AMQ /** * Declare a queue with the given queueName */ - public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal, - final boolean nowait, boolean passive) + public String send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal, + final boolean nowait, boolean passive) throws AMQException { - AMQShortString queueName; + String queueName; if (amqd.getAMQQueueName() == null) { // generate a name for this queue - queueName = new AMQShortString(createTemporaryQueueName()); + queueName = createTemporaryQueueName(); amqd.setQueueName(queueName); } else @@ -782,7 +775,7 @@ public class AMQSession_0_10 extends AMQ arguments.put(AddressHelper.NO_LOCAL, true); } - getQpidSession().queueDeclare(queueName.toString(), "" , arguments, + getQpidSession().queueDeclare(queueName, "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE, @@ -794,7 +787,7 @@ public class AMQSession_0_10 extends AMQ Node node = amqd.getNode(); Map<String,Object> arguments = new HashMap<String,Object>(); arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs()); - if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) + if (arguments.get(AddressHelper.NO_LOCAL) == null) { arguments.put(AddressHelper.NO_LOCAL, noLocal); } @@ -817,9 +810,9 @@ public class AMQSession_0_10 extends AMQ /** * deletes a queue */ - public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException + public void sendQueueDelete(final String queueName) throws AMQException, FailoverException { - getQpidSession().queueDelete(queueName.toString()); + getQpidSession().queueDelete(queueName); // ifEmpty --> false // ifUnused --> false // We need to sync so that we get notify of an error. @@ -958,25 +951,25 @@ public class AMQSession_0_10 extends AMQ } @Override - protected AMQShortString declareQueue(final AMQDestination amqd, + protected String declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - return new FailoverNoopSupport<AMQShortString, AMQException>( - new FailoverProtectedOperation<AMQShortString, AMQException>() + return new FailoverNoopSupport<String, AMQException>( + new FailoverProtectedOperation<String, AMQException>() { - public AMQShortString execute() throws AMQException, FailoverException + public String execute() throws AMQException, FailoverException { // Generate the queue name if the destination indicates that a client generated name is to be used. if (amqd.isNameRequired()) { String binddingKey = ""; - for(AMQShortString key : amqd.getBindingKeys()) + for(String key : amqd.getBindingKeys()) { - binddingKey = binddingKey + "_" + key.toString(); + binddingKey = binddingKey + "_" + key; } - amqd.setQueueName(new AMQShortString( binddingKey + "@" - + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); + amqd.setQueueName(binddingKey + "@" + + amqd.getExchangeName() + "_" + UUID.randomUUID()); } return send0_10QueueDeclare(amqd, noLocal, nowait, passive); } @@ -1124,7 +1117,7 @@ public class AMQSession_0_10 extends AMQ { _logger.debug("Setting Exchange type " + result.getType()); node.setExchangeType(result.getType()); - dest.setExchangeClass(new AMQShortString(result.getType())); + dest.setExchangeClass(result.getType()); } } @@ -1254,7 +1247,7 @@ public class AMQSession_0_10 extends AMQ if (queueName == null) { queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); - dest.setQueueName(new AMQShortString(queueName)); + dest.setQueueName(queueName); } SubscriptionQueue queueProps = link.getSubscriptionQueue(); Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Tue Jun 2 20:53:49 2015 @@ -56,7 +56,6 @@ import org.apache.qpid.client.messaging. import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; @@ -133,11 +132,16 @@ public class AMQSession_0_8 extends AMQS AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, + this(con, + channelId, + transacted, + acknowledgeMode, + MessageFactoryRegistry.newDefaultRegistry(), + defaultPrefetchHigh, defaultPrefetchLow); } - private ProtocolVersion getProtocolVersion() + ProtocolVersion getProtocolVersion() { return getProtocolHandler().getProtocolVersion(); } @@ -194,8 +198,8 @@ public class AMQSession_0_8 extends AMQS getUnacknowledgedMessageTags().remove(deliveryTag); } - public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination destination, + public void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments, + final String exchangeName, final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL) @@ -225,7 +229,7 @@ public class AMQSession_0_8 extends AMQS continue; } String queue = binding.getQueue() == null? - queueName.asString(): binding.getQueue(); + queueName.toString() : binding.getQueue(); String exchange = binding.getExchange() == null ? defaultExchange : @@ -253,8 +257,11 @@ public class AMQSession_0_8 extends AMQS { getProtocolHandler().closeSession(this); - getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), - new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(getChannelId()), + getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry() + .createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), + new AMQShortString( + "JMS client closing channel"), 0, 0) + .generateFrame(getChannelId()), ChannelCloseOkBody.class, timeout); // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully. @@ -281,19 +288,10 @@ public class AMQSession_0_8 extends AMQS _currentPrefetch.set(0); } - public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException, + public void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException { - FieldTable table = null; - if(arguments != null && !arguments.isEmpty()) - { - table = new FieldTable(); - for(Map.Entry<String, Object> entry : arguments.entrySet()) - { - table.setObject(entry.getKey(), entry.getValue()); - } - } - sendQueueDeclare(name, durable, exclusive, autoDelete, table, false); + sendQueueDeclare(name, durable, exclusive, autoDelete, arguments, false); } public void sendRecover() throws AMQException, FailoverException @@ -314,7 +312,7 @@ public class AMQSession_0_8 extends AMQS { // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad // in 0-9 we used the cleaner addition of a new sync recover method with its own ok - if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) + if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v0_8)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class); @@ -422,7 +420,7 @@ public class AMQSession_0_8 extends AMQS } - public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + public boolean isQueueBound(final String exchangeName, final String queueName, final String routingKey) throws JMSException { try @@ -455,7 +453,7 @@ public class AMQSession_0_8 extends AMQS * Returns false if not connected to a Qpid broker which supports the necessary AMQP extension. */ @Override - protected boolean isBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + protected boolean isBound(final String exchangeName, final String queueName, final String routingKey) throws AMQException { if(!getAMQConnection().getDelegate().supportsIsBound()) @@ -480,7 +478,7 @@ public class AMQSession_0_8 extends AMQS } - protected boolean exchangeExists(final AMQShortString exchangeName) + protected boolean exchangeExists(final String exchangeName) throws AMQException { if(!getAMQConnection().getDelegate().supportsIsBound()) @@ -505,9 +503,9 @@ public class AMQSession_0_8 extends AMQS return (responseBody.getReplyCode() == 0 || responseBody.getReplyCode() == 3); } - private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, - AMQShortString routingKey, - AMQShortString queueName) throws AMQException, FailoverException + private AMQMethodEvent sendExchangeBound(String exchangeName, + String routingKey, + String queueName) throws AMQException, FailoverException { AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody (exchangeName, routingKey, queueName).generateFrame(getChannelId()); @@ -517,15 +515,15 @@ public class AMQSession_0_8 extends AMQS @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, - AMQShortString queueName, - boolean nowait, - int tag) throws AMQException, FailoverException + String queueName, + boolean nowait, + int tag) throws AMQException, FailoverException { queueName = preprocessAddressTopic(consumer, queueName); AMQDestination destination = consumer.getDestination(); - Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments()); + Map<String, Object> arguments = consumer.getArguments(); Link link = destination.getLink(); if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) @@ -535,12 +533,12 @@ public class AMQSession_0_8 extends AMQS BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, - new AMQShortString(String.valueOf(tag)), + String.valueOf(tag), consumer.isNoLocal(), consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, consumer.isExclusive(), nowait, - FieldTable.convertToFieldTable(arguments)); + arguments); AMQFrame jmsConsume = body.generateFrame(getChannelId()); @@ -564,7 +562,7 @@ public class AMQSession_0_8 extends AMQS if (dest.getQueueName() == null) { queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); - dest.setQueueName(new AMQShortString(queueName)); + dest.setQueueName(queueName); } else { @@ -590,11 +588,11 @@ public class AMQSession_0_8 extends AMQS { // not setting alternate exchange - sendQueueDeclare(AMQShortString.valueOf(queueName), + sendQueueDeclare(queueName, link.isDurable(), queueProps.isExclusive(), queueProps.isAutoDelete(), - FieldTable.convertToFieldTable(arguments), + arguments, false); return null; @@ -603,7 +601,7 @@ public class AMQSession_0_8 extends AMQS Map<String,Object> bindingArguments = new HashMap<String, Object>(); - bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments); doBind(dest, binding, queueName, dest.getAddressName()); @@ -611,13 +609,15 @@ public class AMQSession_0_8 extends AMQS } @Override - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + public void sendExchangeDeclare(final String name, final String type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path - ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type, - name.toString().startsWith("amq."), + ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(), + name, + type, + name.startsWith("amq."), durable, autoDelete, internal, false, null); AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); @@ -625,8 +625,9 @@ public class AMQSession_0_8 extends AMQS } @Override - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, - boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + public void sendExchangeDeclare(final String name, final String type, final boolean nowait, + boolean durable, boolean autoDelete, Map<String,Object> arguments, + final boolean passive) throws AMQException, FailoverException { //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path @@ -634,7 +635,8 @@ public class AMQSession_0_8 extends AMQS ExchangeDeclareBody body = methodRegistry.createExchangeDeclareBody(getTicket(), name, type, - passive || name.toString().startsWith("amq."), + passive || name.toString() + .startsWith("amq."), durable, autoDelete, false, @@ -648,7 +650,7 @@ public class AMQSession_0_8 extends AMQS public void sendExchangeDelete(final String name) throws AMQException, FailoverException { ExchangeDeleteBody body = - getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false); + getMethodRegistry().createExchangeDeleteBody(getTicket(), name, false, false); AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); @@ -656,18 +658,17 @@ public class AMQSession_0_8 extends AMQS private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { - AMQShortString queueName = amqd.getAMQQueueName(); + String queueName = amqd.getAMQQueueName(); boolean durable = amqd.isDurable(); boolean exclusive = amqd.isExclusive(); boolean autoDelete = amqd.isAutoDelete(); - FieldTable arguments = null; - sendQueueDeclare(queueName, durable, exclusive, autoDelete, arguments, passive); + sendQueueDeclare(queueName, durable, exclusive, autoDelete, null, passive); } - private void sendQueueDeclare(final AMQShortString queueName, + private void sendQueueDeclare(final String queueName, final boolean durable, final boolean exclusive, - final boolean autoDelete, final FieldTable arguments, final boolean passive) + final boolean autoDelete, final Map<String,Object> arguments, final boolean passive) throws AMQException, FailoverException { QueueDeclareBody body = @@ -686,16 +687,16 @@ public class AMQSession_0_8 extends AMQS } @Override - protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, - final boolean nowait, final boolean passive) throws AMQException + protected String declareQueue(final AMQDestination amqd, final boolean noLocal, + final boolean nowait, final boolean passive) throws AMQException { //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new FailoverNoopSupport<AMQShortString, AMQException>( - new FailoverProtectedOperation<AMQShortString, AMQException>() + return new FailoverNoopSupport<String, AMQException>( + new FailoverProtectedOperation<String, AMQException>() { - public AMQShortString execute() throws AMQException, FailoverException + public String execute() throws AMQException, FailoverException { // Generate the queue name if the destination indicates that a client generated name is to be used. if (amqd.isNameRequired()) @@ -710,7 +711,7 @@ public class AMQSession_0_8 extends AMQS }, getAMQConnection()).execute(); } - public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException + public void sendQueueDelete(final String queueName) throws AMQException, FailoverException { QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(), queueName, @@ -730,7 +731,7 @@ public class AMQSession_0_8 extends AMQS } public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final Map<String,Object> arguments, final boolean noConsume, final boolean autoClose) throws JMSException { return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal, @@ -776,8 +777,8 @@ public class AMQSession_0_8 extends AMQS AbstractJMSMessage bouncedMessage = getMessageFactoryRegistry().createMessage(0, false, - msg.getExchange(), - msg.getRoutingKey(), + AMQShortString.toString(msg.getExchange()), + AMQShortString.toString(msg.getRoutingKey()), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, @@ -1006,7 +1007,7 @@ public class AMQSession_0_8 extends AMQS public void sync() throws AMQException { - declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); + declareExchange("amq.direct", "direct", false); } @Override @@ -1035,8 +1036,8 @@ public class AMQSession_0_8 extends AMQS } else { - boolean isExchange = exchangeExists(AMQShortString.valueOf(name)); - boolean isQueue = isBound(null,AMQShortString.valueOf(name), null); + boolean isExchange = exchangeExists(name); + boolean isQueue = isBound(null, name, null); if (!isExchange && !isQueue) { @@ -1082,11 +1083,11 @@ public class AMQSession_0_8 extends AMQS public Void execute() throws AMQException, FailoverException { - sendQueueDeclare(AMQShortString.valueOf(dest.getAddressName()), + sendQueueDeclare(dest.getAddressName(), node.isDurable(), node.isExclusive(), node.isAutoDelete(), - FieldTable.convertToFieldTable(arguments), + arguments, false); return null; @@ -1110,12 +1111,12 @@ public class AMQSession_0_8 extends AMQS } // can't set alt. exchange - declareExchange(AMQShortString.valueOf(dest.getAddressName()), - AMQShortString.valueOf(node.getExchangeType()), + declareExchange(dest.getAddressName(), + node.getExchangeType(), false, node.isDurable(), node.isAutoDelete(), - FieldTable.convertToFieldTable(arguments), false); + arguments, false); // If bindings are specified without a queue name and is called by the producer, // the broker will send an exception as expected. @@ -1140,11 +1141,11 @@ public class AMQSession_0_8 extends AMQS MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); QueueBindBody queueBindBody = methodRegistry.createQueueBindBody(getTicket(), - AMQShortString.valueOf(queue), - AMQShortString.valueOf(exchange), - AMQShortString.valueOf(bindingKey), + queue, + exchange, + bindingKey, false, - FieldTable.convertToFieldTable(binding.getArgs())); + binding.getArgs()); getProtocolHandler().syncWrite(queueBindBody. generateFrame(getChannelId()), QueueBindOkBody.class); @@ -1164,10 +1165,10 @@ public class AMQSession_0_8 extends AMQS public Object execute() throws AMQException, FailoverException { - if (isBound(null, AMQShortString.valueOf(queue), null)) + if (isBound(null, queue, null)) { - if(ProtocolVersion.v8_0.equals(getProtocolVersion())) + if(ProtocolVersion.v0_8.equals(getProtocolVersion())) { throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); } @@ -1205,7 +1206,7 @@ public class AMQSession_0_8 extends AMQS final boolean durable, final boolean autoDelete, final boolean exclusive, final Map<String, Object> args) throws AMQException { - boolean match = isBound(null,AMQShortString.valueOf(queueName), null); + boolean match = isBound(null, queueName, null); if (assertNode) { @@ -1223,11 +1224,11 @@ public class AMQSession_0_8 extends AMQS public Void execute() throws AMQException, FailoverException { - sendQueueDeclare(AMQShortString.valueOf(queueName), + sendQueueDeclare(queueName, durable, exclusive, autoDelete, - FieldTable.convertToFieldTable(args), + args, true); return null; @@ -1243,7 +1244,7 @@ public class AMQSession_0_8 extends AMQS public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { - boolean match = exchangeExists(AMQShortString.valueOf(dest.getAddressName())); + boolean match = exchangeExists(dest.getAddressName()); Node node = dest.getNode(); @@ -1252,12 +1253,12 @@ public class AMQSession_0_8 extends AMQS if (assertNode) { - declareExchange(AMQShortString.valueOf(dest.getAddressName()), - AMQShortString.valueOf(node.getExchangeType()), + declareExchange(dest.getAddressName(), + node.getExchangeType(), false, node.isDurable(), node.isAutoDelete(), - FieldTable.convertToFieldTable(node.getDeclareArgs()), true); + node.getDeclareArgs(), true); } else @@ -1266,7 +1267,7 @@ public class AMQSession_0_8 extends AMQS /* _logger.debug("Setting Exchange type " + result.getType()); node.setExchangeType(result.getType()); - dest.setExchangeClass(new AMQShortString(result.getType())); + dest.setExchangeClass(result.getType()); */ } @@ -1311,7 +1312,7 @@ public class AMQSession_0_8 extends AMQS { public Object execute() throws AMQException, FailoverException { - sendQueueDelete(AMQShortString.valueOf(dest.getAddressName())); + sendQueueDelete(dest.getAddressName()); return null; } }, getAMQConnection()).execute(); @@ -1371,7 +1372,7 @@ public class AMQSession_0_8 extends AMQS public Void execute() throws AMQException, FailoverException { - sendQueueDelete(AMQShortString.valueOf(dest.getQueueName())); + sendQueueDelete(dest.getQueueName()); return null; } }, getAMQConnection())).execute(); @@ -1399,9 +1400,9 @@ public class AMQSession_0_8 extends AMQS public boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String, Object> args) throws JMSException { - return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName), - queueName == null ? null : new AMQShortString(queueName), - bindingKey == null ? null : new AMQShortString(bindingKey)); + return isQueueBound(exchangeName, + queueName, + bindingKey); } private AMQProtocolHandler getProtocolHandler() @@ -1497,17 +1498,17 @@ public class AMQSession_0_8 extends AMQS public abstract static class DestinationCache<T extends AMQDestination> { - private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>(); + private final Map<String, Map<String, T>> cache = new HashMap<String, Map<String, T>>(); - public T getDestination(AMQShortString exchangeName, AMQShortString routingKey) + public T getDestination(String exchangeName, String routingKey) { - Map<AMQShortString, T> routingMap = cache.get(exchangeName); + Map<String, T> routingMap = cache.get(exchangeName); if(routingMap == null) { - routingMap = new LinkedHashMap<AMQShortString, T>() + routingMap = new LinkedHashMap<String, T>() { - protected boolean removeEldestEntry(Map.Entry<AMQShortString, T> eldest) + protected boolean removeEldestEntry(Map.Entry<String, T> eldest) { return size() >= 200; } @@ -1523,12 +1524,12 @@ public class AMQSession_0_8 extends AMQS return destination; } - protected abstract T newDestination(AMQShortString exchangeName, AMQShortString routingKey); + protected abstract T newDestination(String exchangeName, String routingKey); } private static class TopicDestinationCache extends DestinationCache<AMQTopic> { - protected AMQTopic newDestination(AMQShortString exchangeName, AMQShortString routingKey) + protected AMQTopic newDestination(String exchangeName, String routingKey) { return new AMQTopic(exchangeName, routingKey, null); } @@ -1536,7 +1537,7 @@ public class AMQSession_0_8 extends AMQS private static class QueueDestinationCache extends DestinationCache<AMQQueue> { - protected AMQQueue newDestination(AMQShortString exchangeName, AMQShortString routingKey) + protected AMQQueue newDestination(String exchangeName, String routingKey) { return new AMQQueue(exchangeName, routingKey, routingKey); } Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Tue Jun 2 20:53:49 2015 @@ -36,7 +36,7 @@ final class AMQTemporaryQueue extends AM /** Create a new instance of an AMQTemporaryQueue */ public AMQTemporaryQueue(AMQSession session) { - super(session.getTemporaryQueueExchangeName(), new AMQShortString(session.createTemporaryQueueName()), true); + super(session.getTemporaryQueueExchangeName(), session.createTemporaryQueueName(), true); _session = session; } Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Tue Jun 2 20:53:49 2015 @@ -40,7 +40,7 @@ class AMQTemporaryTopic extends AMQTopic */ public AMQTemporaryTopic(AMQSession session) { - super(session.getTemporaryTopicExchangeName(),new AMQShortString("tmp_" + UUID.randomUUID())); + super(session.getTemporaryTopicExchangeName(),"tmp_" + UUID.randomUUID()); _session = session; } Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTopic.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQTopic.java Tue Jun 2 20:53:49 2015 @@ -34,6 +34,7 @@ import org.apache.qpid.url.BindingURL; public class AMQTopic extends AMQDestination implements Topic { + private static final long serialVersionUID = -4773561540716587036L; public AMQTopic(String address) throws URISyntaxException @@ -61,63 +62,53 @@ public class AMQTopic extends AMQDestina super(binding); } - public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) + public AMQTopic(String exchange, String routingKey, String queueName) { - super(exchange, AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), routingKey, true, true, queueName, false); + super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false); } - public AMQTopic(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName,AMQShortString[] bindingKeys) + public AMQTopic(String exchange, String routingKey, String queueName, String[] bindingKeys) { - super(exchange, AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), routingKey, true, true, queueName, false,bindingKeys); + super(exchange, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, routingKey, true, true, queueName, false, bindingKeys); } public AMQTopic(AMQConnection conn, String routingKey) { - this(conn.getDefaultTopicExchangeName(), new AMQShortString(routingKey)); + this(conn.getDefaultTopicExchangeName(), routingKey); } public AMQTopic(String exchangeName, String routingKey) { - this(AMQShortString.valueOf(exchangeName), new AMQShortString(routingKey)); - } - - public AMQTopic(AMQShortString exchangeName, String routingKey) - { - this(exchangeName, new AMQShortString(routingKey)); - } - - public AMQTopic(AMQShortString exchangeName, AMQShortString routingKey) - { this(exchangeName, routingKey, null); } - public AMQTopic(AMQShortString exchangeName, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + public AMQTopic(String exchangeName, String name, boolean isAutoDelete, String queueName, boolean isDurable) { - super(exchangeName, AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS), name, true, isAutoDelete, queueName, isDurable); + super(exchangeName, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete, queueName, isDurable); } - protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString name, boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + protected AMQTopic(String exchangeName, String exchangeClass, String name, boolean isAutoDelete, String queueName, boolean isDurable) { super(exchangeName, exchangeClass, name, true, isAutoDelete, queueName, isDurable); } - protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + protected AMQTopic(String exchangeName, String exchangeClass, String routingKey, boolean isExclusive, + boolean isAutoDelete, String queueName, boolean isDurable) { super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable ); } - protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, - boolean isAutoDelete, AMQShortString queueName, boolean isDurable,AMQShortString[] bindingKeys) + protected AMQTopic(String exchangeName, String exchangeClass, String routingKey, boolean isExclusive, + boolean isAutoDelete, String queueName, boolean isDurable, String[] bindingKeys) { - super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys); + super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable, bindingKeys); } public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection) throws JMSException { - if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic) + if (topic instanceof AMQDestination) { AMQDestination qpidTopic = (AMQDestination)topic; if (qpidTopic.getDestSyntax() == DestSyntax.ADDR) @@ -125,9 +116,9 @@ public class AMQTopic extends AMQDestina try { AMQTopic t = new AMQTopic(qpidTopic.getAddress()); - AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection); + String queueName = getDurableTopicQueueName(subscriptionName, connection); // link is never null if dest was created using an address string. - t.getLink().setName(queueName.asString()); + t.getLink().setName(queueName); t.getLink().getSubscriptionQueue().setAutoDelete(false); t.getLink().setDurable(true); @@ -155,16 +146,16 @@ public class AMQTopic extends AMQDestina } } - public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException + public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { - return new AMQShortString(connection.getClientID() + ":" + subscriptionName); + return connection.getClientID() + ":" + subscriptionName; } public String getTopicName() throws JMSException { if (getRoutingKey() != null) { - return getRoutingKey().asString(); + return getRoutingKey().toString(); } else if (getSubject() != null) { @@ -177,11 +168,11 @@ public class AMQTopic extends AMQDestina } @Override - public AMQShortString getExchangeName() + public String getExchangeName() { if (super.getExchangeName() == null && super.getAddressName() != null) { - return new AMQShortString(super.getAddressName()); + return super.getAddressName(); } else { @@ -189,7 +180,7 @@ public class AMQTopic extends AMQDestina } } - public AMQShortString getRoutingKey() + public String getRoutingKey() { if (super.getRoutingKey() != null) { @@ -197,11 +188,11 @@ public class AMQTopic extends AMQDestina } else if (getSubject() != null) { - return new AMQShortString(getSubject()); + return getSubject(); } else { - setRoutingKey(new AMQShortString("")); + setRoutingKey(""); setSubject(""); return super.getRoutingKey(); } @@ -212,19 +203,6 @@ public class AMQTopic extends AMQDestina return !isDurable(); } - /** - * Override since the queue is always private and we must ensure it remains null. If not, - * reuse of the topic when registering consumers will make all consumers listen on the same (private) queue rather - * than getting their own (private) queue. - * <p> - * This is relatively nasty but it is difficult to come up with a more elegant solution, given - * the requirement in the case on AMQQueue and possibly other AMQDestination subclasses to - * use the underlying queue name even where it is server generated. - */ - public void setQueueName(String queueName) - { - } - public boolean equals(Object o) { if (getDestSyntax() == DestSyntax.ADDR) Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java Tue Jun 2 20:53:49 2015 @@ -20,16 +20,14 @@ */ package org.apache.qpid.client; -import org.apache.qpid.framing.AMQShortString; - public class AMQUndefinedDestination extends AMQDestination { - private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown"); - private static final long serialVersionUID = -1487224209485888847L; + private static final String UNKNOWN_EXCHANGE_CLASS = "unknown"; + private static final long serialVersionUID = -3938019873332367947L; - public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) + public AMQUndefinedDestination(String exchange, String routingKey, String queueName) { super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName); } Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Jun 2 20:53:49 2015 @@ -22,8 +22,10 @@ package org.apache.qpid.client; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -50,9 +52,6 @@ import org.apache.qpid.client.message.Cl import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.util.JMSExceptionHelper; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.apache.qpid.transport.TransportException; @@ -91,7 +90,7 @@ public abstract class BasicMessageConsum /** * We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ - private final FieldTable _arguments; + private final Map<String,Object> _arguments; /** * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of @@ -125,7 +124,7 @@ public abstract class BasicMessageConsum * Used to store this consumer queue name * Usefull when more than binding key should be used */ - private AMQShortString _queuename; + private String _queuename; /** * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive @@ -141,7 +140,7 @@ public abstract class BasicMessageConsum protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession session, FieldTable rawSelector, + AMQSession session, Map<String,Object> rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { @@ -186,15 +185,15 @@ public abstract class BasicMessageConsum _acknowledgeMode = acknowledgeMode; } - final FieldTable ft = FieldTableFactory.newFieldTable(); + final Map<String,Object> ft = new HashMap<>(); if(destination.getConsumerArguments() != null) { - ft.addAll(FieldTable.convertToFieldTable(destination.getConsumerArguments())); + ft.putAll(destination.getConsumerArguments()); } // rawSelector is used by HeadersExchange and is not a JMS Selector if (rawSelector != null) { - ft.addAll(rawSelector); + ft.putAll(rawSelector); } // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a @@ -353,7 +352,7 @@ public abstract class BasicMessageConsum _receivingThread = null; } - public FieldTable getArguments() + public Map<String,Object> getArguments() { return _arguments; } @@ -1019,12 +1018,12 @@ public abstract class BasicMessageConsum return tags; } - public AMQShortString getQueuename() + public String getQueuename() { return _queuename; } - public void setQueuename(AMQShortString queuename) + public void setQueuename(String queuename) { this._queuename = queuename; } Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1683186&r1=1683185&r2=1683186&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Tue Jun 2 20:53:49 2015 @@ -18,6 +18,7 @@ package org.apache.qpid.client; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSException; @@ -36,7 +37,6 @@ import org.apache.qpid.client.message.Me import org.apache.qpid.client.message.UnprocessedMessage_0_10; import org.apache.qpid.client.util.JMSExceptionHelper; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.Acquired; @@ -82,7 +82,7 @@ public class BasicMessageConsumer_0_10 e protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession<?,?> session, FieldTable rawSelector, + AMQSession<?,?> session, Map<String,Object> rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
