Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1689876&r1=1689875&r2=1689876&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jul 8 13:50:16 2015 @@ -44,9 +44,10 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQDisconnectedException; -import org.apache.qpid.AMQException; +import org.apache.qpid.QpidException; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; @@ -311,9 +312,9 @@ public abstract class AMQSession<C exten return _immediatePrefetch; } - abstract void handleNodeDelete(final AMQDestination dest) throws AMQException; + abstract void handleNodeDelete(final AMQDestination dest) throws QpidException; - abstract void handleLinkDelete(final AMQDestination dest) throws AMQException; + abstract void handleLinkDelete(final AMQDestination dest) throws QpidException; /** * Creates a new session on a connection. @@ -446,7 +447,7 @@ public abstract class AMQSession<C exten close(-1); } - public abstract AMQException getLastException(); + public abstract QpidException getLastException(); public void checkNotClosed() throws JMSException { @@ -456,11 +457,18 @@ public abstract class AMQSession<C exten } catch (IllegalStateException ise) { - AMQException ex = getLastException(); + QpidException ex = getLastException(); if (ex != null) { + AMQConstant code = null; + + if (ex instanceof AMQException) + { + code = ((AMQException) ex).getErrorCode(); + } + throw JMSExceptionHelper.chainJMSException(new IllegalStateException("Session has been closed", - ex.getErrorCode().toString()), ex); + code == null ? null : code.toString()), ex); } else { @@ -529,7 +537,7 @@ public abstract class AMQSession<C exten dest.setRoutingKey(dest.getSubject()); } - protected void verifySubject(AMQDestination dest) throws AMQException + protected void verifySubject(AMQDestination dest) throws QpidException { if (dest.getSubject() == null || dest.getSubject().trim().equals("")) { @@ -547,9 +555,9 @@ public abstract class AMQSession<C exten } } - public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws AMQException; + public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws QpidException; - public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException; + public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws QpidException; /** * 1. Try to resolve the address type (queue or exchange) @@ -566,7 +574,7 @@ public abstract class AMQSession<C exten @SuppressWarnings("deprecation") public void resolveAddress(AMQDestination dest, boolean isConsumer, - boolean noLocal) throws AMQException + boolean noLocal) throws QpidException { if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) { @@ -619,7 +627,7 @@ public abstract class AMQSession<C exten break; default: - throw new AMQException( + throw new QpidException( "The name '" + dest.getAddressName() + "' supplied in the address doesn't resolve to an exchange or a queue"); } @@ -630,7 +638,7 @@ public abstract class AMQSession<C exten } } - public abstract int resolveAddressType(AMQDestination dest) throws AMQException; + public abstract int resolveAddressType(AMQDestination dest) throws QpidException; protected abstract void acknowledgeImpl() throws JMSException; @@ -655,24 +663,24 @@ public abstract class AMQSession<C exten * @param arguments Additional arguments. * @param exchangeName The exchange to bind the queue on. * - * @throws AMQException If the queue cannot be bound for any reason. + * @throws QpidException If the queue cannot be bound for any reason. * TODO Be aware of possible changes to parameter order as versions change. * TODO Document the additional arguments that may be passed in the field table. Are these for headers exchanges? */ public void bindQueue(final String queueName, final String routingKey, final Map<String,Object> arguments, - final String exchangeName, final AMQDestination destination) throws AMQException + final String exchangeName, final AMQDestination destination) throws QpidException { bindQueue(queueName, routingKey, arguments, exchangeName, destination, false); } public void bindQueue(final String queueName, final String routingKey, final Map<String,Object> arguments, final String exchangeName, final AMQDestination destination, - final boolean nowait) throws AMQException + final boolean nowait) throws QpidException { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait); return null; @@ -680,7 +688,7 @@ public abstract class AMQSession<C exten }, _connection).execute(); } - public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException + public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws QpidException { if (consumer.getQueuename() != null) { @@ -690,7 +698,7 @@ public abstract class AMQSession<C exten public abstract void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments, final String exchangeName, AMQDestination destination, - final boolean nowait) throws AMQException, FailoverException; + final boolean nowait) throws QpidException, FailoverException; /** * Closes the session. @@ -749,7 +757,7 @@ public abstract class AMQSession<C exten } } } - catch (AMQException e) + catch (QpidException e) { throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing session: " + e), e); } @@ -771,7 +779,7 @@ public abstract class AMQSession<C exten } } - public abstract void sendClose(long timeout) throws AMQException, FailoverException; + public abstract void sendClose(long timeout) throws QpidException, FailoverException; /** * Called when the server initiates the closure of the session unilaterally. @@ -801,14 +809,14 @@ public abstract class AMQSession<C exten { // An AMQException has an error code and message already and will be passed in when closure occurs as a // result of a channel close request - AMQException amqe; - if (e instanceof AMQException) + QpidException amqe; + if (e instanceof QpidException) { - amqe = (AMQException) e; + amqe = (QpidException) e; } else { - amqe = new AMQException("Closing session forcibly", e); + amqe = new QpidException("Closing session forcibly", e); } _connection.deregisterSession(_channelId); @@ -857,7 +865,7 @@ public abstract class AMQSession<C exten commitImpl(); markClean(); } - catch (AMQException e) + catch (QpidException e) { throw toJMSException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e); } @@ -872,7 +880,7 @@ public abstract class AMQSession<C exten } } - protected abstract void commitImpl() throws AMQException, FailoverException, TransportException; + protected abstract void commitImpl() throws QpidException, FailoverException, TransportException; @@ -1047,7 +1055,7 @@ public abstract class AMQSession<C exten throw new JMSException("Durable subscribers can only be created for Topics"); } } - catch(AMQException e) + catch(QpidException e) { throw toJMSException("Error when verifying destination",e); } @@ -1121,7 +1129,7 @@ public abstract class AMQSession<C exten dest, true); } - catch(AMQException e) + catch(QpidException e) { throw toJMSException("Error when checking binding",e); } @@ -1286,11 +1294,11 @@ public abstract class AMQSession<C exten * @param durable Flag to indicate that the queue is durable. * @param exclusive Flag to indicate that the queue is exclusive to this client. * - * @throws AMQException If the queue cannot be declared for any reason. + * @throws QpidException If the queue cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ public void createQueue(final String name, final boolean autoDelete, final boolean durable, - final boolean exclusive) throws AMQException + final boolean exclusive) throws QpidException { createQueue(name, autoDelete, durable, exclusive, null); } @@ -1306,15 +1314,15 @@ public abstract class AMQSession<C exten * @param exclusive Flag to indicate that the queue is exclusive to this client. * @param arguments Arguments used to set special properties of the queue * - * @throws AMQException If the queue cannot be declared for any reason. + * @throws QpidException If the queue cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ public void createQueue(final String name, final boolean autoDelete, final boolean durable, - final boolean exclusive, final Map<String, Object> arguments) throws AMQException + final boolean exclusive, final Map<String, Object> arguments) throws QpidException { - new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverRetrySupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { sendCreateQueue(name, autoDelete, durable, exclusive, arguments); return null; @@ -1323,7 +1331,8 @@ public abstract class AMQSession<C exten } public abstract void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, - final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException; + final boolean exclusive, final Map<String, Object> arguments) throws + QpidException, FailoverException; /** * Creates a QueueReceiver @@ -1489,7 +1498,7 @@ public abstract class AMQSession<C exten new HashMap<String, Object>(), result.getExchangeName(), result); return result; } - catch (AMQException e) + catch (QpidException e) { throw toJMSException("Cannot create temporary queue",e); } @@ -1566,12 +1575,12 @@ public abstract class AMQSession<C exten } } - public void declareExchange(String name, String type, boolean nowait) throws AMQException + public void declareExchange(String name, String type, boolean nowait) throws QpidException { declareExchange(name, type, nowait, false, false, false); } - abstract public void sync() throws AMQException; + abstract public void sync() throws QpidException; public int getAcknowledgeMode() { @@ -1706,7 +1715,7 @@ public abstract class AMQSession<C exten public void declareAndBind(AMQDestination amqd) throws - AMQException + QpidException { declareAndBind(amqd, new HashMap<String,Object>()); } @@ -1714,7 +1723,7 @@ public abstract class AMQSession<C exten public void declareAndBind(AMQDestination amqd, Map<String,Object> arguments) throws - AMQException + QpidException { declareExchange(amqd, false); String queueName = declareQueue(amqd, false); @@ -1802,7 +1811,7 @@ public abstract class AMQSession<C exten suspendChannel(false); } } - catch (AMQException e) + catch (QpidException e) { throw toJMSException("Recover failed: " + e.getMessage(), e); } @@ -1817,7 +1826,7 @@ public abstract class AMQSession<C exten } } - protected abstract void sendRecover() throws AMQException, FailoverException; + protected abstract void sendRecover() throws QpidException, FailoverException; protected abstract void flushAcknowledgments(); @@ -1889,7 +1898,7 @@ public abstract class AMQSession<C exten suspendChannel(false); } } - catch (AMQException e) + catch (QpidException e) { throw toJMSException("Failed to rollback: " + e, e); } @@ -1907,7 +1916,7 @@ public abstract class AMQSession<C exten public abstract void releaseForRollback(); - public abstract void sendRollback() throws AMQException, FailoverException; + public abstract void sendRollback() throws QpidException, FailoverException; public void run() { @@ -2073,7 +2082,7 @@ public abstract class AMQSession<C exten + amqd.getRoutingKey() ), e); } - catch (AMQException e) + catch (QpidException e) { if (e instanceof AMQChannelClosedException) { @@ -2306,9 +2315,9 @@ public abstract class AMQSession<C exten /** * Resubscribes all producers and consumers. This is called when performing failover. * - * @throws AMQException + * @throws QpidException */ - void resubscribe() throws AMQException + void resubscribe() throws QpidException { if (_dirty) { @@ -2338,12 +2347,12 @@ public abstract class AMQSession<C exten /** * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * - * @throws AMQException If the session cannot be started for any reason. + * @throws QpidException If the session cannot be started for any reason. * TODO This should be controlled by _stopped as it pairs with the stop method fixme or check the * FlowControlledBlockingQueue _queue to see if we have flow controlled. will result in sending Flow messages * for each subsequent call to flow.. only need to do this if we have called stop. */ - void start() throws AMQException + void start() throws QpidException { // Check if the session has previously been started and suspended, in which case it must be unsuspended. if (_startedAtLeastOnce.getAndSet(true)) @@ -2377,7 +2386,7 @@ public abstract class AMQSession<C exten { suspendChannel(false); } - catch (AMQException e) + catch (QpidException e) { _logger.info("Unsuspending channel threw an exception:", e); } @@ -2419,7 +2428,7 @@ public abstract class AMQSession<C exten } } - void stop() throws AMQException + void stop() throws QpidException { // Stop the server delivering messages to this session. if (!(isClosed() || isClosing())) @@ -2578,7 +2587,7 @@ public abstract class AMQSession<C exten * * @param amqe the exception, may be null to indicate no error has occurred */ - private void closeProducersAndConsumers(AMQException amqe) throws JMSException + private void closeProducersAndConsumers(QpidException amqe) throws JMSException { JMSException jmse = null; try @@ -2614,7 +2623,7 @@ public abstract class AMQSession<C exten * Register to consume from the queue. * @param queueName */ - private void consumeFromQueue(C consumer, String queueName, boolean nowait) throws AMQException, FailoverException + private void consumeFromQueue(C consumer, String queueName, boolean nowait) throws QpidException, FailoverException { int tagId = _nextTag++; @@ -2633,7 +2642,7 @@ public abstract class AMQSession<C exten { sendConsume(consumer, queueName, nowait, tagId); } - catch (AMQException e) + catch (QpidException e) { // clean-up the map in the event of an error _consumers.remove(tagId); @@ -2641,13 +2650,13 @@ public abstract class AMQSession<C exten } } - void handleLinkCreation(AMQDestination dest) throws AMQException + void handleLinkCreation(AMQDestination dest) throws QpidException { createBindings(dest, dest.getLink().getBindings()); } - void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws AMQException + void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws QpidException { String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest .getAddressName() : "amq.topic"; @@ -2682,15 +2691,15 @@ public abstract class AMQSession<C exten } } - protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException; + protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws QpidException; - abstract void handleExchangeNodeCreation(AMQDestination dest) throws AMQException; + abstract void handleExchangeNodeCreation(AMQDestination dest) throws QpidException; abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange) - throws AMQException; + throws QpidException; public abstract void sendConsume(C consumer, String queueName, - boolean nowait, int tag) throws AMQException, FailoverException; + boolean nowait, int tag) throws QpidException, FailoverException; private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) throws JMSException @@ -2724,7 +2733,7 @@ public abstract class AMQSession<C exten public abstract P createMessageProducer(final Destination destination, final Boolean mandatory, final Boolean immediate, final long producerId) throws JMSException; - private void declareExchange(AMQDestination amqd, boolean nowait) throws AMQException + private void declareExchange(AMQDestination amqd, boolean nowait) throws QpidException { declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), nowait, amqd.isExchangeDurable(), amqd.isExchangeAutoDelete(), amqd.isExchangeInternal()); @@ -2739,10 +2748,10 @@ public abstract class AMQSession<C exten * * @return the number of queued messages. * - * @throws AMQException If the queue cannot be declared for any reason. + * @throws QpidException If the queue cannot be declared for any reason. */ public long getQueueDepth(final AMQDestination amqd) - throws AMQException + throws QpidException { return getQueueDepth(amqd, false); } @@ -2755,13 +2764,13 @@ public abstract class AMQSession<C exten * @param amqd AMQ destination to get the depth value * @param sync flag to sync session before receiving the queue depth * @return queue depth - * @throws AMQException + * @throws QpidException */ - public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException + public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws QpidException { - return new FailoverNoopSupport<Long, AMQException>(new FailoverProtectedOperation<Long, AMQException>() + return new FailoverNoopSupport<Long, QpidException>(new FailoverProtectedOperation<Long, QpidException>() { - public Long execute() throws AMQException, FailoverException + public Long execute() throws QpidException, FailoverException { try { @@ -2775,7 +2784,7 @@ public abstract class AMQSession<C exten }, _connection).execute(); } - protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException; + protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws QpidException, FailoverException; /** * Declares the named exchange and type of exchange. @@ -2788,16 +2797,16 @@ public abstract class AMQSession<C exten * @param durable * @param autoDelete * @param internal - * @throws AMQException If the exchange cannot be declared for any reason. + * @throws QpidException If the exchange cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ void declareExchange(final String name, final String type, final boolean nowait, final boolean durable, - final boolean autoDelete, final boolean internal) throws AMQException + final boolean autoDelete, final boolean internal) throws QpidException { - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { sendExchangeDeclare(name, type, nowait, durable, autoDelete, internal); return null; @@ -2808,11 +2817,11 @@ public abstract class AMQSession<C exten void declareExchange(final String name, final String type, final boolean nowait, final boolean durable, final boolean autoDelete, final Map<String,Object> arguments, - final boolean passive) throws AMQException + final boolean passive) throws QpidException { - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive); return null; @@ -2821,7 +2830,7 @@ public abstract class AMQSession<C exten } protected String preprocessAddressTopic(final C consumer, - String queueName) throws AMQException + String queueName) throws QpidException { if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) { @@ -2838,10 +2847,12 @@ public abstract class AMQSession<C exten return queueName; } - abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException; + abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws + QpidException; public abstract void sendExchangeDeclare(final String name, final String type, final boolean nowait, - boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; + boolean durable, boolean autoDelete, boolean internal) throws + QpidException, FailoverException; public abstract void sendExchangeDeclare(final String name, @@ -2850,7 +2861,7 @@ public abstract class AMQSession<C exten boolean durable, boolean autoDelete, Map<String,Object> arguments, - final boolean passive) throws AMQException, FailoverException; + final boolean passive) throws QpidException, FailoverException; /** * Declares a queue for a JMS destination. @@ -2868,25 +2879,26 @@ public abstract class AMQSession<C exten * * * - * @throws AMQException If the queue cannot be declared for any reason. + * @throws QpidException If the queue cannot be declared for any reason. * TODO Verify the destiation is valid or throw an exception. * TODO Be aware of possible changes to parameter order as versions change. */ protected String declareQueue(final AMQDestination amqd, - final boolean noLocal) throws AMQException + final boolean noLocal) throws QpidException { return declareQueue(amqd, noLocal, false); } protected String declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait) - throws AMQException + throws QpidException { return declareQueue(amqd, noLocal, nowait, false); } protected abstract String declareQueue(final AMQDestination amqd, - final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException; + final boolean noLocal, final boolean nowait, final boolean passive) throws + QpidException; /** * Undeclares the specified queue. @@ -2902,16 +2914,16 @@ public abstract class AMQSession<C exten { try { - new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverRetrySupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { sendQueueDelete(queueName); return null; } }, _connection).execute(); } - catch (AMQException e) + catch (QpidException e) { throw toJMSException("The queue deletion failed: " + e.getMessage(), e); } @@ -2932,7 +2944,7 @@ public abstract class AMQSession<C exten deleteQueue(amqQueue.getAMQQueueName()); } - public abstract void sendQueueDelete(final String queueName) throws AMQException, FailoverException; + public abstract void sendQueueDelete(final String queueName) throws QpidException, FailoverException; private long getNextProducerId() { @@ -2991,9 +3003,9 @@ public abstract class AMQSession<C exten * * @param consumer * - * @throws AMQException + * @throws QpidException */ - private void registerConsumer(C consumer, boolean nowait) throws AMQException + private void registerConsumer(C consumer, boolean nowait) throws QpidException { AMQDestination amqd = consumer.getDestination(); @@ -3044,7 +3056,7 @@ public abstract class AMQSession<C exten _logger.debug( "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); } - catch (AMQException e) + catch (QpidException e) { _logger.info("Suspending channel threw an exception:", e); } @@ -3062,12 +3074,12 @@ public abstract class AMQSession<C exten } catch (FailoverException e) { - throw new AMQException(null, "Fail-over exception interrupted basic consume.", e); + throw new QpidException("Fail-over exception interrupted basic consume.", e); } } protected abstract boolean isBound(String exchangeName, String amqQueueName, String routingKey) - throws AMQException; + throws QpidException; private void registerProducer(long producerId, MessageProducer producer) { @@ -3116,7 +3128,7 @@ public abstract class AMQSession<C exten } } - private void resubscribeConsumers() throws AMQException + private void resubscribeConsumers() throws QpidException { ArrayList<C> consumers = new ArrayList<C>(_consumers.values()); _consumers.clear(); @@ -3129,7 +3141,7 @@ public abstract class AMQSession<C exten } } - private void resubscribeProducers() throws AMQException + private void resubscribeProducers() throws QpidException { ArrayList producers = new ArrayList(_producers.values()); _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", @@ -3148,10 +3160,10 @@ public abstract class AMQSession<C exten * @param suspend true indicates that the session should be suspended, false indicates that it * should be unsuspended. * - * @throws AMQException If the session cannot be suspended for any reason. + * @throws QpidException If the session cannot be suspended for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException + protected void suspendChannel(boolean suspend) throws QpidException // , FailoverException { synchronized (_suspensionLock) { @@ -3167,7 +3179,7 @@ public abstract class AMQSession<C exten } catch (FailoverException e) { - throw new AMQException(null, "Fail-over interrupted suspend/unsuspend channel.", e); + throw new QpidException("Fail-over interrupted suspend/unsuspend channel.", e); } catch (TransportException e) { @@ -3176,7 +3188,7 @@ public abstract class AMQSession<C exten } } - public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException; + public abstract void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException; Object getMessageDeliveryLock() { @@ -3569,7 +3581,7 @@ public abstract class AMQSession<C exten } } } - catch (AMQException e) + catch (QpidException e) { _logger.warn("Unable to " + (_suspend.get() ? "suspend" : "unsuspend") + " session " + AMQSession.this + " due to: ", e); if (_logger.isDebugEnabled()) @@ -3629,11 +3641,16 @@ public abstract class AMQSession<C exten return code; } - JMSException toJMSException(String message, AMQException e) + JMSException toJMSException(String message, QpidException e) { JMSException ex; + AMQConstant errorCode = null; + + if (e instanceof AMQException) + { + errorCode = ((AMQException) e).getErrorCode(); + } - AMQConstant errorCode = e.getErrorCode(); if (errorCode == AMQConstant.ACCESS_REFUSED) { ex = JMSExceptionHelper.chainJMSException(new JMSSecurityException(message,
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1689876&r1=1689875&r2=1689876&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jul 8 13:50:16 2015 @@ -40,6 +40,7 @@ import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.QpidException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.AMQDestination.DestSyntax; @@ -122,7 +123,7 @@ public class AMQSession_0_10 extends AMQ * The latest qpid Exception that has been raised. */ private Object _currentExceptionLock = new Object(); - private AMQException _currentException; + private QpidException _currentException; // a ref on the qpid connection private org.apache.qpid.transport.Connection _qpidConnection; @@ -326,7 +327,7 @@ public class AMQSession_0_10 extends AMQ public void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments, final String exchangeName, final AMQDestination destination, final boolean nowait) - throws AMQException + throws QpidException { if (destination == null || destination.getDestSyntax() == DestSyntax.BURL) { @@ -402,10 +403,10 @@ public class AMQSession_0_10 extends AMQ * Close this session. * * @param timeout no used / 0_8 specific - * @throws AMQException + * @throws QpidException * @throws FailoverException */ - public void sendClose(long timeout) throws AMQException, FailoverException + public void sendClose(long timeout) throws QpidException, FailoverException { cancelTimerTask(); flushAcknowledgments(); @@ -419,7 +420,7 @@ public class AMQSession_0_10 extends AMQ setCurrentException(se); } - AMQException amqe = getCurrentException(); + QpidException amqe = getCurrentException(); if (amqe != null) { throw amqe; @@ -436,11 +437,11 @@ public class AMQSession_0_10 extends AMQ * the queue will be marked as durable. * @param exclusive Exclusive queues can only be used from one connection at a time. * @param arguments Exclusive queues can only be used from one connection at a time. - * @throws AMQException + * @throws QpidException * @throws FailoverException */ public void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, - final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException + final boolean exclusive, Map<String, Object> arguments) throws QpidException, FailoverException { getQpidSession().queueDeclare(name, null, arguments, durable ? Option.DURABLE : Option.NONE, autoDelete ? Option.AUTO_DELETE : Option.NONE, @@ -452,10 +453,10 @@ public class AMQSession_0_10 extends AMQ /** * This method asks the broker to redeliver all unacknowledged messages * - * @throws AMQException + * @throws QpidException * @throws FailoverException */ - public void sendRecover() throws AMQException, FailoverException + public void sendRecover() throws QpidException, FailoverException { // release all unacked messages RangeSet all = RangeSetFactory.createRangeSet(); @@ -611,7 +612,7 @@ public class AMQSession_0_10 extends AMQ */ public void sendConsume(BasicMessageConsumer_0_10 consumer, String queueName, boolean nowait, int tag) - throws AMQException, FailoverException + throws QpidException, FailoverException { queueName = preprocessAddressTopic(consumer, queueName); boolean preAcquire = consumer.isPreAcquire(); @@ -671,7 +672,7 @@ public class AMQSession_0_10 extends AMQ return new BasicMessageProducer_0_10(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, producerId, immediate, mandatory); } - catch (AMQException e) + catch (QpidException e) { throw toJMSException("Error creating producer",e); } @@ -686,14 +687,15 @@ public class AMQSession_0_10 extends AMQ * creates an exchange if it does not already exist */ public void sendExchangeDeclare(final String name, final String type, final boolean nowait, - boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException + boolean durable, boolean autoDelete, boolean internal) throws QpidException, FailoverException { //The 'internal' parameter is ignored on the 0-10 path, the protocol does not support it sendExchangeDeclare(name, type, null, null, nowait, durable, autoDelete); } public void sendExchangeDeclare(final String name, final String type, final boolean nowait, - boolean durable, boolean autoDelete, Map<String,Object> arguments, final boolean passive) throws AMQException, FailoverException + boolean durable, boolean autoDelete, Map<String,Object> arguments, final boolean passive) throws + QpidException, FailoverException { sendExchangeDeclare(name, type, null, arguments, @@ -703,7 +705,7 @@ public class AMQSession_0_10 extends AMQ public void sendExchangeDeclare(final String name, final String type, final String alternateExchange, final Map<String, Object> args, - final boolean nowait, boolean durable, boolean autoDelete) throws AMQException + final boolean nowait, boolean durable, boolean autoDelete) throws QpidException { getQpidSession().exchangeDeclare( name, @@ -724,7 +726,7 @@ public class AMQSession_0_10 extends AMQ * deletes an exchange */ public void sendExchangeDelete(final String name, final boolean nowait) - throws AMQException, FailoverException + throws QpidException, FailoverException { getQpidSession().exchangeDelete(name); // We need to sync so that we get notify of an error. @@ -739,7 +741,7 @@ public class AMQSession_0_10 extends AMQ */ public String send0_10QueueDeclare(final AMQDestination amqd, final boolean noLocal, final boolean nowait, boolean passive) - throws AMQException + throws QpidException { String queueName; if (amqd.getAMQQueueName() == null) @@ -796,7 +798,7 @@ public class AMQSession_0_10 extends AMQ /** * deletes a queue */ - public void sendQueueDelete(final String queueName) throws AMQException, FailoverException + public void sendQueueDelete(final String queueName) throws QpidException, FailoverException { getQpidSession().queueDelete(queueName); // ifEmpty --> false @@ -808,7 +810,7 @@ public class AMQSession_0_10 extends AMQ /** * Activate/deactivate the message flow for all the consumers of this session. */ - public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException + public void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException { if (suspend) { @@ -860,7 +862,7 @@ public class AMQSession_0_10 extends AMQ } - public void sendRollback() throws AMQException, FailoverException + public void sendRollback() throws QpidException, FailoverException { getQpidSession().txRollback(); // We need to sync so that we get notify of an error. @@ -884,9 +886,9 @@ public class AMQSession_0_10 extends AMQ * * @throws SessionException get the latest thrown error. */ - public AMQException getCurrentException() + public QpidException getCurrentException() { - AMQException amqe = null; + QpidException amqe = null; synchronized (_currentExceptionLock) { if (_currentException != null) @@ -931,7 +933,7 @@ public class AMQSession_0_10 extends AMQ } } - public AMQException getLastException() + public QpidException getLastException() { return getCurrentException(); } @@ -939,12 +941,12 @@ public class AMQSession_0_10 extends AMQ @Override protected String declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait, final boolean passive) - throws AMQException + throws QpidException { - return new FailoverNoopSupport<String, AMQException>( - new FailoverProtectedOperation<String, AMQException>() + return new FailoverNoopSupport<String, QpidException>( + new FailoverProtectedOperation<String, QpidException>() { - public String execute() throws AMQException, FailoverException + public String execute() throws QpidException, FailoverException { // Generate the queue name if the destination indicates that a client generated name is to be used. if (amqd.isNameRequired()) @@ -998,7 +1000,7 @@ public class AMQSession_0_10 extends AMQ } } - public void commitImpl() throws AMQException, FailoverException, TransportException + public void commitImpl() throws QpidException, FailoverException, TransportException { if( _txSize > 0 ) { @@ -1030,7 +1032,7 @@ public class AMQSession_0_10 extends AMQ return Serial.lt((int) currentMark, (int) deliveryTag); } - public void sync() throws AMQException + public void sync() throws QpidException { try { @@ -1041,7 +1043,7 @@ public class AMQSession_0_10 extends AMQ setCurrentException(se); } - AMQException amqe = getCurrentException(); + QpidException amqe = getCurrentException(); if (amqe != null) { throw amqe; @@ -1058,7 +1060,7 @@ public class AMQSession_0_10 extends AMQ { code = ee.getErrorCode().getValue(); } - AMQException amqe = new AMQException(AMQConstant.getConstant(code), _isHardError, se.getMessage(), se.getCause()); + QpidException amqe = new AMQException(AMQConstant.getConstant(code), _isHardError, se.getMessage(), se.getCause()); _currentException = amqe; } if (!_isHardError) @@ -1083,7 +1085,7 @@ public class AMQSession_0_10 extends AMQ } @Override - public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws QpidException { boolean match = true; ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); @@ -1111,7 +1113,7 @@ public class AMQSession_0_10 extends AMQ { if (!match) { - throw new AMQException("Assert failed for address : " + dest +", Result was : " + result); + throw new QpidException("Assert failed for address : " + dest +", Result was : " + result); } } @@ -1119,7 +1121,7 @@ public class AMQSession_0_10 extends AMQ } @Override - public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws QpidException { Node node = dest.getNode(); return isQueueExist(dest.getAddressName(), assertNode, @@ -1129,7 +1131,7 @@ public class AMQSession_0_10 extends AMQ public boolean isQueueExist(String queueName, boolean assertNode, boolean durable, boolean autoDelete, - boolean exclusive, Map<String, Object> args) throws AMQException + boolean exclusive, Map<String, Object> args) throws QpidException { boolean match = true; try @@ -1149,7 +1151,7 @@ public class AMQSession_0_10 extends AMQ { if (!match) { - throw new AMQException("Assert failed for queue : " + queueName +", Result was : " + result); + throw new QpidException("Assert failed for queue : " + queueName +", Result was : " + result); } } } @@ -1195,7 +1197,7 @@ public class AMQSession_0_10 extends AMQ } @Override - public int resolveAddressType(AMQDestination dest) throws AMQException + public int resolveAddressType(AMQDestination dest) throws QpidException { int type = dest.getAddressType(); String name = dest.getAddressName(); @@ -1217,7 +1219,7 @@ public class AMQSession_0_10 extends AMQ type = AMQDestination.TOPIC_TYPE; } else { //both a queue and exchange exist for that name - throw new AMQException("Ambiguous address, please specify queue or topic as node type"); + throw new QpidException("Ambiguous address, please specify queue or topic as node type"); } dest.setAddressType(type); return type; @@ -1225,7 +1227,7 @@ public class AMQSession_0_10 extends AMQ } @Override - void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException + void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws QpidException { Link link = dest.getLink(); String queueName = dest.getQueueName(); @@ -1245,7 +1247,7 @@ public class AMQSession_0_10 extends AMQ if (link.isDurable() && queueName.startsWith("TempQueue")) { - throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); + throw new QpidException("You cannot mark a subscription queue as durable without providing a name for the link."); } getQpidSession().queueDeclare(queueName, @@ -1274,7 +1276,7 @@ public class AMQSession_0_10 extends AMQ } @Override - void resubscribe() throws AMQException + void resubscribe() throws QpidException { // Also reset the delivery tag tracker, to insure we dont // return the first <total number of msgs received on session> @@ -1292,7 +1294,7 @@ public class AMQSession_0_10 extends AMQ } @Override - void stop() throws AMQException + void stop() throws QpidException { super.stop(); setUsingDispatcherForCleanup(true); @@ -1351,7 +1353,7 @@ public class AMQSession_0_10 extends AMQ } @Override - protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws QpidException { Node node = dest.getNode(); Map<String,Object> arguments = node.getDeclareArgs(); @@ -1370,7 +1372,7 @@ public class AMQSession_0_10 extends AMQ } @Override - void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + void handleExchangeNodeCreation(AMQDestination dest) throws QpidException { Node node = dest.getNode(); sendExchangeDeclare(dest.getAddressName(), @@ -1395,7 +1397,7 @@ public class AMQSession_0_10 extends AMQ binding.getArgs()); } - void handleLinkDelete(AMQDestination dest) throws AMQException + void handleLinkDelete(AMQDestination dest) throws QpidException { // We need to destroy link bindings String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest @@ -1432,7 +1434,7 @@ public class AMQSession_0_10 extends AMQ } } - void deleteSubscriptionQueue(AMQDestination dest) throws AMQException + void deleteSubscriptionQueue(AMQDestination dest) throws QpidException { // We need to delete the subscription queue. if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && @@ -1444,7 +1446,7 @@ public class AMQSession_0_10 extends AMQ } @Override - void handleNodeDelete(AMQDestination dest) throws AMQException + void handleNodeDelete(AMQDestination dest) throws QpidException { if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) { Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1689876&r1=1689875&r2=1689876&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Jul 8 13:50:16 2015 @@ -41,6 +41,7 @@ import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.QpidException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQUndeliveredException; import org.apache.qpid.client.failover.FailoverException; @@ -133,7 +134,7 @@ public class AMQSession_0_8 extends AMQS { reduceCreditAfterAcknowledge(); } - catch (AMQException e) + catch (QpidException e) { throw JMSExceptionHelper.chainJMSException(new JMSException("Session.reduceCreditAfterAcknowledge failed"), e); @@ -157,7 +158,7 @@ public class AMQSession_0_8 extends AMQS sync(); } } - catch (AMQException a) + catch (QpidException a) { throw JMSExceptionHelper.chainJMSException(new JMSException("Failed to sync after acknowledge"), a); } @@ -180,7 +181,7 @@ public class AMQSession_0_8 extends AMQS public void sendQueueBind(final String queueName, final String routingKey, final Map<String,Object> arguments, final String exchangeName, final AMQDestination destination, - final boolean nowait) throws AMQException, FailoverException + final boolean nowait) throws QpidException, FailoverException { if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL) { @@ -224,7 +225,7 @@ public class AMQSession_0_8 extends AMQS } } - public void sendClose(long timeout) throws AMQException, FailoverException + public void sendClose(long timeout) throws QpidException, FailoverException { // we also need to check the state manager for 08/09 as the // _connection variable may not be updated in time by the error receiving @@ -248,7 +249,7 @@ public class AMQSession_0_8 extends AMQS } } - public void commitImpl() throws AMQException, FailoverException, TransportException + public void commitImpl() throws QpidException, FailoverException, TransportException { // Acknowledge all delivered messages while (true) @@ -268,13 +269,14 @@ public class AMQSession_0_8 extends AMQS _currentPrefetch.set(0); } - public void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException, + public void sendCreateQueue(String name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws + QpidException, FailoverException { sendQueueDeclare(name, durable, exclusive, autoDelete, arguments, false); } - public void sendRecover() throws AMQException, FailoverException + public void sendRecover() throws QpidException, FailoverException { enforceRejectBehaviourDuringRecover(); getPrefetchedMessageTags().clear(); @@ -405,10 +407,10 @@ public class AMQSession_0_8 extends AMQS { try { - AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>( - new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, QpidException>( + new FailoverProtectedOperation<AMQMethodEvent, QpidException>() { - public AMQMethodEvent execute() throws AMQException, FailoverException + public AMQMethodEvent execute() throws QpidException, FailoverException { return sendExchangeBound(exchangeName, routingKey, queueName); @@ -420,7 +422,7 @@ public class AMQSession_0_8 extends AMQS return (responseBody.getReplyCode() == 0); } - catch (AMQException e) + catch (QpidException e) { throw JMSExceptionHelper.chainJMSException(new JMSException("Queue bound query failed: " + e.getMessage()), e); @@ -434,17 +436,17 @@ public class AMQSession_0_8 extends AMQS */ @Override protected boolean isBound(final String exchangeName, final String queueName, final String routingKey) - throws AMQException + throws QpidException { if(!getAMQConnection().getDelegate().supportsIsBound()) { return false; } - AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>( - new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, QpidException>( + new FailoverProtectedOperation<AMQMethodEvent, QpidException>() { - public AMQMethodEvent execute() throws AMQException, FailoverException + public AMQMethodEvent execute() throws QpidException, FailoverException { return sendExchangeBound(exchangeName, routingKey, queueName); @@ -459,17 +461,17 @@ public class AMQSession_0_8 extends AMQS protected boolean exchangeExists(final String exchangeName) - throws AMQException + throws QpidException { if(!getAMQConnection().getDelegate().supportsIsBound()) { return false; } - AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>( - new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, QpidException>( + new FailoverProtectedOperation<AMQMethodEvent, QpidException>() { - public AMQMethodEvent execute() throws AMQException, FailoverException + public AMQMethodEvent execute() throws QpidException, FailoverException { return sendExchangeBound(exchangeName, null, null); @@ -485,7 +487,7 @@ public class AMQSession_0_8 extends AMQS private AMQMethodEvent sendExchangeBound(String exchangeName, String routingKey, - String queueName) throws AMQException, FailoverException + String queueName) throws QpidException, FailoverException { AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody (exchangeName, routingKey, queueName).generateFrame(getChannelId()); @@ -497,7 +499,7 @@ public class AMQSession_0_8 extends AMQS public void sendConsume(BasicMessageConsumer_0_8 consumer, String queueName, boolean nowait, - int tag) throws AMQException, FailoverException + int tag) throws QpidException, FailoverException { queueName = preprocessAddressTopic(consumer, queueName); @@ -534,7 +536,7 @@ public class AMQSession_0_8 extends AMQS } @Override - void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException + void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws QpidException { final Link link = dest.getLink(); final String queueName ; @@ -558,13 +560,13 @@ public class AMQSession_0_8 extends AMQS if (link.isDurable() && queueName.startsWith("TempQueue")) { - throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); + throw new QpidException("You cannot mark a subscription queue as durable without providing a name for the link."); } - (new FailoverNoopSupport<Void, AMQException>( - new FailoverProtectedOperation<Void, AMQException>() + (new FailoverNoopSupport<Void, QpidException>( + new FailoverProtectedOperation<Void, QpidException>() { - public Void execute() throws AMQException, FailoverException + public Void execute() throws QpidException, FailoverException { // not setting alternate exchange @@ -590,7 +592,7 @@ public class AMQSession_0_8 extends AMQS @Override public void sendExchangeDeclare(final String name, final String type, final boolean nowait, - boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException + boolean durable, boolean autoDelete, boolean internal) throws QpidException, FailoverException { //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path @@ -607,7 +609,7 @@ public class AMQSession_0_8 extends AMQS @Override public void sendExchangeDeclare(final String name, final String type, final boolean nowait, boolean durable, boolean autoDelete, Map<String,Object> arguments, - final boolean passive) throws AMQException, FailoverException + final boolean passive) throws QpidException, FailoverException { //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path @@ -626,7 +628,7 @@ public class AMQSession_0_8 extends AMQS getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void sendExchangeDelete(final String name) throws AMQException, FailoverException + public void sendExchangeDelete(final String name) throws QpidException, FailoverException { ExchangeDeleteBody body = getMethodRegistry().createExchangeDeleteBody(getTicket(), name, false, false); @@ -635,7 +637,7 @@ public class AMQSession_0_8 extends AMQS getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws QpidException, FailoverException { String queueName = amqd.getAMQQueueName(); boolean durable = amqd.isDurable(); @@ -648,7 +650,7 @@ public class AMQSession_0_8 extends AMQS final boolean durable, final boolean exclusive, final boolean autoDelete, final Map<String,Object> arguments, final boolean passive) - throws AMQException, FailoverException + throws QpidException, FailoverException { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(), @@ -667,15 +669,15 @@ public class AMQSession_0_8 extends AMQS @Override protected String declareQueue(final AMQDestination amqd, final boolean noLocal, - final boolean nowait, final boolean passive) throws AMQException + final boolean nowait, final boolean passive) throws QpidException { //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new FailoverNoopSupport<String, AMQException>( - new FailoverProtectedOperation<String, AMQException>() + return new FailoverNoopSupport<String, QpidException>( + new FailoverProtectedOperation<String, QpidException>() { - public String execute() throws AMQException, FailoverException + public String execute() throws QpidException, FailoverException { // Generate the queue name if the destination indicates that a client generated name is to be used. if (amqd.isNameRequired()) @@ -690,7 +692,7 @@ public class AMQSession_0_8 extends AMQS }, getAMQConnection()).execute(); } - public void sendQueueDelete(final String queueName) throws AMQException, FailoverException + public void sendQueueDelete(final String queueName) throws QpidException, FailoverException { QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(), queueName, @@ -702,7 +704,7 @@ public class AMQSession_0_8 extends AMQS getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } - public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException + public void sendSuspendChannel(boolean suspend) throws QpidException, FailoverException { ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend); AMQFrame channelFlowFrame = body.generateFrame(getChannelId()); @@ -727,7 +729,7 @@ public class AMQSession_0_8 extends AMQS return new BasicMessageProducer_0_8(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(), this, getProtocolHandler(), producerId, immediate, mandatory); } - catch (AMQException e) + catch (QpidException e) { throw JMSExceptionHelper.chainJMSException(new JMSException("Error creating producer"), e); } @@ -796,7 +798,7 @@ public class AMQSession_0_8 extends AMQS - public void sendRollback() throws AMQException, FailoverException + public void sendRollback() throws QpidException, FailoverException { TxRollbackBody body = getMethodRegistry().createTxRollbackBody(); AMQFrame frame = body.generateFrame(getChannelId()); @@ -804,7 +806,7 @@ public class AMQSession_0_8 extends AMQS } public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) - throws AMQException, FailoverException + throws QpidException, FailoverException { _currentPrefetch.set(0); if(messagePrefetch > 0 || sizePrefetch > 0) @@ -818,12 +820,12 @@ public class AMQSession_0_8 extends AMQS - protected boolean ensureCreditForReceive() throws AMQException + protected boolean ensureCreditForReceive() throws QpidException { return new FailoverNoopSupport<>( - new FailoverProtectedOperation<Boolean, AMQException>() + new FailoverProtectedOperation<Boolean, QpidException>() { - public Boolean execute() throws AMQException, FailoverException + public Boolean execute() throws QpidException, FailoverException { int currentPrefetch = _currentPrefetch.get(); if (currentPrefetch >= getPrefetch() && getPrefetch() >= 0) @@ -849,16 +851,16 @@ public class AMQSession_0_8 extends AMQS } - protected void reduceCreditAfterAcknowledge() throws AMQException + protected void reduceCreditAfterAcknowledge() throws QpidException { boolean manageCredit = isManagingCredit(); if(manageCredit && _creditChanged.compareAndSet(true,false)) { new FailoverNoopSupport<>( - new FailoverProtectedOperation<Void, AMQException>() + new FailoverProtectedOperation<Void, QpidException>() { - public Void execute() throws AMQException, FailoverException + public Void execute() throws QpidException, FailoverException { int prefetch = getPrefetch(); if(prefetch == 0) @@ -945,7 +947,7 @@ public class AMQSession_0_8 extends AMQS } } - protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException + protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws QpidException, FailoverException { if(_useLegacyQueueDepthBehaviour || isBound(null, amqd.getAMQQueueName(), null)) @@ -984,14 +986,14 @@ public class AMQSession_0_8 extends AMQS return AMQMessageDelegateFactory.FACTORY_0_8; } - public void sync() throws AMQException + public void sync() throws QpidException { declareExchange("amq.direct", "direct", false); } @Override public void resolveAddress(final AMQDestination dest, final boolean isConsumer, final boolean noLocal) - throws AMQException + throws QpidException { if(!isAddrSyntaxSupported()) { @@ -1005,7 +1007,7 @@ public class AMQSession_0_8 extends AMQS return ((AMQConnectionDelegate_8_0)(getAMQConnection().getDelegate())).isAddrSyntaxSupported(); } - public int resolveAddressType(AMQDestination dest) throws AMQException + public int resolveAddressType(AMQDestination dest) throws QpidException { int type = dest.getAddressType(); String name = dest.getAddressName(); @@ -1035,14 +1037,14 @@ public class AMQSession_0_8 extends AMQS else { //both a queue and exchange exist for that name - throw new AMQException("Ambiguous address, please specify queue or topic as node type"); + throw new QpidException("Ambiguous address, please specify queue or topic as node type"); } dest.setAddressType(type); return type; } } - protected void handleQueueNodeCreation(final AMQDestination dest, boolean noLocal) throws AMQException + protected void handleQueueNodeCreation(final AMQDestination dest, boolean noLocal) throws QpidException { final Node node = dest.getNode(); final Map<String,Object> arguments = node.getDeclareArgs(); @@ -1056,10 +1058,10 @@ public class AMQSession_0_8 extends AMQS arguments.put("alternateExchange", altExchange); } - (new FailoverNoopSupport<Void, AMQException>( - new FailoverProtectedOperation<Void, AMQException>() + (new FailoverNoopSupport<Void, QpidException>( + new FailoverProtectedOperation<Void, QpidException>() { - public Void execute() throws AMQException, FailoverException + public Void execute() throws QpidException, FailoverException { sendQueueDeclare(dest.getAddressName(), @@ -1078,7 +1080,7 @@ public class AMQSession_0_8 extends AMQS sync(); } - void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + void handleExchangeNodeCreation(AMQDestination dest) throws QpidException { Node node = dest.getNode(); String altExchange = dest.getNode().getAlternateExchange(); @@ -1107,13 +1109,13 @@ public class AMQSession_0_8 extends AMQS protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, - final String exchange) throws AMQException + final String exchange) throws QpidException { final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { @@ -1137,11 +1139,11 @@ public class AMQSession_0_8 extends AMQS protected void doUnbind(final AMQDestination.Binding binding, final String queue, - final String exchange) throws AMQException + final String exchange) throws QpidException { - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { if (isBound(null, queue, null)) @@ -1173,7 +1175,7 @@ public class AMQSession_0_8 extends AMQS }, getAMQConnection()).execute(); } - public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws QpidException { Node node = dest.getNode(); return isQueueExist(dest.getAddressName(), assertNode, @@ -1183,7 +1185,7 @@ public class AMQSession_0_8 extends AMQS public boolean isQueueExist(final String queueName, boolean assertNode, final boolean durable, final boolean autoDelete, - final boolean exclusive, final Map<String, Object> args) throws AMQException + final boolean exclusive, final Map<String, Object> args) throws QpidException { boolean match = isBound(null, queueName, null); @@ -1191,16 +1193,16 @@ public class AMQSession_0_8 extends AMQS { if(!match) { - throw new AMQException("Assert failed for queue : " + queueName +". Queue does not exist." ); + throw new QpidException("Assert failed for queue : " + queueName +". Queue does not exist." ); } else { - new FailoverNoopSupport<Void, AMQException>( - new FailoverProtectedOperation<Void, AMQException>() + new FailoverNoopSupport<Void, QpidException>( + new FailoverProtectedOperation<Void, QpidException>() { - public Void execute() throws AMQException, FailoverException + public Void execute() throws QpidException, FailoverException { sendQueueDeclare(queueName, @@ -1221,7 +1223,7 @@ public class AMQSession_0_8 extends AMQS return match; } - public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws QpidException { boolean match = exchangeExists(dest.getAddressName()); @@ -1256,7 +1258,7 @@ public class AMQSession_0_8 extends AMQS { if (!match) { - throw new AMQException("Assert failed for address : " + dest +". Exchange not found."); + throw new QpidException("Assert failed for address : " + dest +". Exchange not found."); } } @@ -1264,16 +1266,16 @@ public class AMQSession_0_8 extends AMQS } @Override - void handleNodeDelete(final AMQDestination dest) throws AMQException + void handleNodeDelete(final AMQDestination dest) throws QpidException { if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) { if (isExchangeExist(dest,false)) { - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { sendExchangeDelete(dest.getAddressName()); return null; @@ -1287,9 +1289,9 @@ public class AMQSession_0_8 extends AMQS if (isQueueExist(dest,false)) { - new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + new FailoverNoopSupport<Object, QpidException>(new FailoverProtectedOperation<Object, QpidException>() { - public Object execute() throws AMQException, FailoverException + public Object execute() throws QpidException, FailoverException { sendQueueDelete(dest.getAddressName()); return null; @@ -1301,7 +1303,7 @@ public class AMQSession_0_8 extends AMQS } @Override - void handleLinkDelete(AMQDestination dest) throws AMQException + void handleLinkDelete(AMQDestination dest) throws QpidException { // We need to destroy link bindings String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest @@ -1338,17 +1340,17 @@ public class AMQSession_0_8 extends AMQS } - void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException + void deleteSubscriptionQueue(final AMQDestination dest) throws QpidException { // We need to delete the subscription queue. if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && dest.getLink().getSubscriptionQueue().isExclusive() && isQueueExist(dest.getQueueName(), false, false, false, false, null)) { - (new FailoverNoopSupport<Void, AMQException>( - new FailoverProtectedOperation<Void, AMQException>() + (new FailoverNoopSupport<Void, QpidException>( + new FailoverProtectedOperation<Void, QpidException>() { - public Void execute() throws AMQException, FailoverException + public Void execute() throws QpidException, FailoverException { sendQueueDelete(dest.getQueueName()); @@ -1395,7 +1397,7 @@ public class AMQSession_0_8 extends AMQS return methodRegistry; } - public AMQException getLastException() + public QpidException getLastException() { // if the Connection has closed then we should throw any exception that // has occurred that we were not waiting for @@ -1405,16 +1407,15 @@ public class AMQSession_0_8 extends AMQS if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED) && e != null) { - if (e instanceof AMQException) + if (e instanceof QpidException) { - return (AMQException) e; + return (QpidException) e; } else { - AMQException amqe = new AMQException(AMQConstant - .getConstant(AMQConstant.INTERNAL_ERROR.getCode()), - e.getMessage(), e.getCause()); - return amqe; + return new AMQException(AMQConstant.getConstant(AMQConstant.INTERNAL_ERROR.getCode()), + e.getMessage(), e.getCause()); + } } else Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1689876&r1=1689875&r2=1689876&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Jul 8 13:50:16 2015 @@ -41,7 +41,7 @@ import javax.jms.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.AMQException; +import org.apache.qpid.QpidException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.filter.JMSSelectorFilter; @@ -619,7 +619,7 @@ public abstract class BasicMessageConsum } } } - catch (AMQException e) + catch (QpidException e) { throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing consumer: " + e.getMessage()), e); } @@ -665,7 +665,7 @@ public abstract class BasicMessageConsum } } - abstract void sendCancel() throws AMQException, FailoverException; + abstract void sendCancel() throws QpidException, FailoverException; /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has @@ -1028,7 +1028,7 @@ public abstract class BasicMessageConsum this._queuename = queuename; } - public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException + public void addBindingKey(AMQDestination amqd, String routingKey) throws QpidException { _session.addBindingKey(this,amqd,routingKey); } Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1689876&r1=1689875&r2=1689876&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Jul 8 13:50:16 2015 @@ -28,6 +28,7 @@ import javax.jms.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.QpidException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.message.AMQMessageDelegateFactory; @@ -151,7 +152,7 @@ public class BasicMessageConsumer_0_10 e } } } - catch (AMQException e) + catch (QpidException e) { _logger.error("Receivecd an Exception when receiving message",e); getSession().getAMQConnection().exceptionReceived(e); @@ -162,7 +163,7 @@ public class BasicMessageConsumer_0_10 e * This method is invoked when this consumer is stopped. * It tells the broker to stop delivering messages to this consumer. */ - @Override void sendCancel() throws AMQException + @Override void sendCancel() throws QpidException { _0_10session.getQpidSession().messageCancel(getConsumerTagString()); postSubscription(); @@ -176,7 +177,7 @@ public class BasicMessageConsumer_0_10 e _0_10session.setCurrentException(se); } - AMQException amqe = _0_10session.getCurrentException(); + QpidException amqe = _0_10session.getCurrentException(); if (amqe != null) { throw amqe; @@ -216,9 +217,9 @@ public class BasicMessageConsumer_0_10 e * * @param message The message to be checked. * @return true if the message matches the selector and can be acquired, false otherwise. - * @throws AMQException If the message preConditions cannot be checked due to some internal error. + * @throws QpidException If the message preConditions cannot be checked due to some internal error. */ - private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException + private boolean checkPreConditions(AbstractJMSMessage message) throws QpidException { boolean messageOk = true; try @@ -280,15 +281,15 @@ public class BasicMessageConsumer_0_10 e * Acknowledge a message * * @param message The message to be acknowledged - * @throws AMQException If the message cannot be acquired due to some internal error. + * @throws QpidException If the message cannot be acquired due to some internal error. */ - private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException + private void acknowledgeMessage(final AbstractJMSMessage message) throws QpidException { _0_10session.messageAcknowledge (Range.newInstance((int) message.getDeliveryTag()), getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); - final AMQException amqe = _0_10session.getCurrentException(); + final QpidException amqe = _0_10session.getCurrentException(); if (amqe != null) { throw amqe; @@ -300,13 +301,13 @@ public class BasicMessageConsumer_0_10 e * processed to ensure their AMQP command-id is marked completed. * * @param message The unwanted message to be flushed - * @throws AMQException If the unwanted message cannot be flushed due to some internal error. + * @throws QpidException If the unwanted message cannot be flushed due to some internal error. */ - private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException + private void flushUnwantedMessage(final AbstractJMSMessage message) throws QpidException { _0_10session.flushProcessed(Range.newInstance((int) message.getDeliveryTag()),false); - final AMQException amqe = _0_10session.getCurrentException(); + final QpidException amqe = _0_10session.getCurrentException(); if (amqe != null) { throw amqe; @@ -318,9 +319,9 @@ public class BasicMessageConsumer_0_10 e * * @param message The message to be acquired * @return true if the message has been acquired, false otherwise. - * @throws AMQException If the message cannot be acquired due to some internal error. + * @throws QpidException If the message cannot be acquired due to some internal error. */ - private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException + private boolean acquireMessage(final AbstractJMSMessage message) throws QpidException { boolean result = false; @@ -483,7 +484,7 @@ public class BasicMessageConsumer_0_10 e } - void postSubscription() throws AMQException + void postSubscription() throws QpidException { AMQDestination dest = this.getDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) @@ -579,7 +580,7 @@ public class BasicMessageConsumer_0_10 e } return message; } - catch (AMQException e) + catch (QpidException e) { throw JMSExceptionHelper.chainJMSException(new JMSException("BasicMessageConsumer.receive failed"), e); } @@ -613,7 +614,7 @@ public class BasicMessageConsumer_0_10 e } return message; } - catch (AMQException e) + catch (QpidException e) { throw JMSExceptionHelper.chainJMSException(new JMSException("BasicMessageConsumer.receiveNoWait failed."), e); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
