Do some cleanup and renaming to make things more consistent. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ec13a0d5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ec13a0d5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ec13a0d5
Branch: refs/heads/master Commit: ec13a0d50f121a71280f9f074a1a6e94e61bbfd1 Parents: 1ec8e2c Author: Timothy Bish <tabish...@gmail.com> Authored: Thu Oct 16 18:52:38 2014 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Thu Oct 16 18:52:38 2014 -0400 ---------------------------------------------------------------------- .../jms/provider/amqp/AbstractAmqpResource.java | 242 ------------------- .../jms/provider/amqp/AmqpAbstractResource.java | 240 ++++++++++++++++++ .../qpid/jms/provider/amqp/AmqpConnection.java | 14 +- .../qpid/jms/provider/amqp/AmqpConsumer.java | 40 +-- .../jms/provider/amqp/AmqpFixedProducer.java | 6 +- .../qpid/jms/provider/amqp/AmqpProducer.java | 6 +- .../jms/provider/amqp/AmqpQueueBrowser.java | 6 +- .../qpid/jms/provider/amqp/AmqpSession.java | 18 +- .../provider/amqp/AmqpTemporaryDestination.java | 20 +- .../provider/amqp/AmqpTransactionContext.java | 6 +- 10 files changed, 298 insertions(+), 300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java deleted file mode 100644 index 1175b34..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java +++ /dev/null @@ -1,242 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.provider.amqp; - -import java.io.IOException; - -import javax.jms.JMSException; -import javax.jms.JMSSecurityException; - -import org.apache.qpid.jms.meta.JmsResource; -import org.apache.qpid.jms.provider.AsyncResult; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.AmqpError; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Endpoint; -import org.apache.qpid.proton.engine.EndpointState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Abstract base for all AmqpResource implementations to extend. - * - * This abstract class wraps up the basic state management bits so that the concrete - * object don't have to reproduce it. Provides hooks for the subclasses to initialize - * and shutdown. - */ -public abstract class AbstractAmqpResource<R extends JmsResource, E extends Endpoint> implements AmqpResource { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractAmqpResource.class); - - protected AsyncResult openRequest; - protected AsyncResult closeRequest; - - protected E endpoint; - protected R info; - - /** - * Creates a new AbstractAmqpResource instance with the JmsResource provided, and - * sets the Endpoint to null. - * - * @param info - * The JmsResource instance that this AmqpResource is managing. - */ - public AbstractAmqpResource(R info) { - this(info, null); - } - - /** - * Creates a new AbstractAmqpResource instance with the JmsResource provided, and - * sets the Endpoint to the given value. - * - * @param info - * The JmsResource instance that this AmqpResource is managing. - * @param endpoint - * The Proton Endpoint instance that this object maps to. - */ - public AbstractAmqpResource(R info, E endpoint) { - this.info = info; - this.endpoint = endpoint; - } - - @Override - public void open(AsyncResult request) { - this.openRequest = request; - doOpen(); - this.endpoint.setContext(this); - this.endpoint.open(); - } - - @Override - public boolean isOpen() { - return this.endpoint.getRemoteState() == EndpointState.ACTIVE; - } - - @Override - public boolean isAwaitingOpen() { - return this.openRequest != null; - } - - @Override - public void opened() { - if (this.openRequest != null) { - this.openRequest.onSuccess(); - this.openRequest = null; - } - } - - @Override - public void close(AsyncResult request) { - // If already closed signal success or else the caller might never get notified. - if (endpoint.getLocalState() == EndpointState.CLOSED) { - request.onSuccess(); - return; - } - - this.closeRequest = request; - doClose(); - this.endpoint.close(); - } - - @Override - public boolean isClosed() { - return this.endpoint.getLocalState() == EndpointState.CLOSED; - } - - @Override - public boolean isAwaitingClose() { - return this.closeRequest != null; - } - - @Override - public void closed() { - if (this.closeRequest != null) { - this.closeRequest.onSuccess(); - this.closeRequest = null; - } - - this.endpoint.close(); - this.endpoint.free(); - } - - @Override - public void failed() { - failed(new JMSException("Remote request failed.")); - } - - @Override - public void failed(Exception cause) { - if (openRequest != null) { - openRequest.onFailure(cause); - openRequest = null; - } - - if (closeRequest != null) { - closeRequest.onFailure(cause); - closeRequest = null; - } - } - - public E getEndpoint() { - return this.endpoint; - } - - public R getJmsResource() { - return this.info; - } - - public EndpointState getLocalState() { - if (endpoint == null) { - return EndpointState.UNINITIALIZED; - } - return this.endpoint.getLocalState(); - } - - public EndpointState getRemoteState() { - if (endpoint == null) { - return EndpointState.UNINITIALIZED; - } - return this.endpoint.getRemoteState(); - } - - @Override - public Exception getRemoteError() { - String message = getRemoteErrorMessage(); - Exception remoteError = null; - Symbol error = endpoint.getRemoteCondition().getCondition(); - if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) { - remoteError = new JMSSecurityException(message); - } else { - remoteError = new JMSException(message); - } - - return remoteError; - } - - @Override - public String getRemoteErrorMessage() { - String message = "Received unkown error from remote peer"; - if (endpoint.getRemoteCondition() != null) { - ErrorCondition error = endpoint.getRemoteCondition(); - if (error.getDescription() != null && !error.getDescription().isEmpty()) { - message = error.getDescription(); - } - } - - return message; - } - - @Override - public void processStateChange() throws IOException { - EndpointState remoteState = endpoint.getRemoteState(); - - if (remoteState == EndpointState.ACTIVE) { - if (isAwaitingOpen()) { - LOG.debug("{} is now open: ", this); - opened(); - } - - // Should not receive an ACTIVE event if not awaiting the open state. - } else if (remoteState == EndpointState.CLOSED) { - if (isAwaitingClose()) { - LOG.debug("{} is now closed: ", this); - closed(); - } else if (isAwaitingOpen()) { - // Error on Open, create exception and signal failure. - LOG.warn("Open of {} failed: ", this); - Exception remoteError = this.getRemoteError(); - failed(remoteError); - } else { - // TODO - Handle remote asynchronous close. - LOG.warn("{} was closed remotely.", this); - } - } - } - - @Override - public void processDeliveryUpdates() throws IOException { - } - - @Override - public void processFlowUpdates() throws IOException { - } - - protected abstract void doOpen(); - - protected abstract void doClose(); - -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java new file mode 100644 index 0000000..83249c2 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; + +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; + +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base for all AmqpResource implementations to extend. + * + * This abstract class wraps up the basic state management bits so that the concrete + * object don't have to reproduce it. Provides hooks for the subclasses to initialize + * and shutdown. + */ +public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endpoint> implements AmqpResource { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class); + + protected AsyncResult openRequest; + protected AsyncResult closeRequest; + + protected E endpoint; + protected R resource; + + /** + * Creates a new instance with the JmsResource provided, and sets the Endpoint to null. + * + * @param resource + * The JmsResource instance that this AmqpResource is managing. + */ + public AmqpAbstractResource(R resource) { + this(resource, null); + } + + /** + * Creates a new instance with the JmsResource provided, and sets the Endpoint to the given value. + * + * @param resource + * The JmsResource instance that this AmqpResource is managing. + * @param endpoint + * The Proton Endpoint instance that this object maps to. + */ + public AmqpAbstractResource(R resource, E endpoint) { + this.resource = resource; + this.endpoint = endpoint; + } + + @Override + public void open(AsyncResult request) { + this.openRequest = request; + doOpen(); + this.endpoint.setContext(this); + this.endpoint.open(); + } + + @Override + public boolean isOpen() { + return this.endpoint.getRemoteState() == EndpointState.ACTIVE; + } + + @Override + public boolean isAwaitingOpen() { + return this.openRequest != null; + } + + @Override + public void opened() { + if (this.openRequest != null) { + this.openRequest.onSuccess(); + this.openRequest = null; + } + } + + @Override + public void close(AsyncResult request) { + // If already closed signal success or else the caller might never get notified. + if (endpoint.getLocalState() == EndpointState.CLOSED) { + request.onSuccess(); + return; + } + + this.closeRequest = request; + doClose(); + this.endpoint.close(); + } + + @Override + public boolean isClosed() { + return this.endpoint.getLocalState() == EndpointState.CLOSED; + } + + @Override + public boolean isAwaitingClose() { + return this.closeRequest != null; + } + + @Override + public void closed() { + if (this.closeRequest != null) { + this.closeRequest.onSuccess(); + this.closeRequest = null; + } + + this.endpoint.close(); + this.endpoint.free(); + } + + @Override + public void failed() { + failed(new JMSException("Remote request failed.")); + } + + @Override + public void failed(Exception cause) { + if (openRequest != null) { + openRequest.onFailure(cause); + openRequest = null; + } + + if (closeRequest != null) { + closeRequest.onFailure(cause); + closeRequest = null; + } + } + + public E getEndpoint() { + return this.endpoint; + } + + public R getJmsResource() { + return this.resource; + } + + public EndpointState getLocalState() { + if (endpoint == null) { + return EndpointState.UNINITIALIZED; + } + return this.endpoint.getLocalState(); + } + + public EndpointState getRemoteState() { + if (endpoint == null) { + return EndpointState.UNINITIALIZED; + } + return this.endpoint.getRemoteState(); + } + + @Override + public Exception getRemoteError() { + String message = getRemoteErrorMessage(); + Exception remoteError = null; + Symbol error = endpoint.getRemoteCondition().getCondition(); + if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) { + remoteError = new JMSSecurityException(message); + } else { + remoteError = new JMSException(message); + } + + return remoteError; + } + + @Override + public String getRemoteErrorMessage() { + String message = "Received unkown error from remote peer"; + if (endpoint.getRemoteCondition() != null) { + ErrorCondition error = endpoint.getRemoteCondition(); + if (error.getDescription() != null && !error.getDescription().isEmpty()) { + message = error.getDescription(); + } + } + + return message; + } + + @Override + public void processStateChange() throws IOException { + EndpointState remoteState = endpoint.getRemoteState(); + + if (remoteState == EndpointState.ACTIVE) { + if (isAwaitingOpen()) { + LOG.debug("{} is now open: ", this); + opened(); + } + + // Should not receive an ACTIVE event if not awaiting the open state. + } else if (remoteState == EndpointState.CLOSED) { + if (isAwaitingClose()) { + LOG.debug("{} is now closed: ", this); + closed(); + } else if (isAwaitingOpen()) { + // Error on Open, create exception and signal failure. + LOG.warn("Open of {} failed: ", this); + Exception remoteError = this.getRemoteError(); + failed(remoteError); + } else { + // TODO - Handle remote asynchronous close. + LOG.warn("{} was closed remotely.", this); + } + } + } + + @Override + public void processDeliveryUpdates() throws IOException { + } + + @Override + public void processFlowUpdates() throws IOException { + } + + protected abstract void doOpen(); + + protected abstract void doClose(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index f3d09b0..d9a52da 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -36,7 +36,7 @@ import org.apache.qpid.proton.engine.Sasl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Connection> { +public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Connection> { private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); @@ -71,7 +71,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn this.authenticator = new AmqpSaslAuthenticator(sasl, info); } - this.info.getConnectionId().setProviderHint(this); + this.resource.getConnectionId().setProviderHint(this); this.queuePrefix = info.getQueuePrefix(); this.topicPrefix = info.getTopicPrefix(); @@ -80,7 +80,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn // Create a Session for this connection that is used for Temporary Destinations // and perhaps later on management and advisory monitoring. - JmsSessionInfo sessionInfo = new JmsSessionInfo(this.info, -1); + JmsSessionInfo sessionInfo = new JmsSessionInfo(this.resource, -1); sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); this.connectionSession = new AmqpConnectionSession(this, sessionInfo); @@ -88,7 +88,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn @Override protected void doOpen() { - this.endpoint.setContainer(info.getClientId()); + this.endpoint.setContainer(resource.getClientId()); this.endpoint.setHostname(remoteURI.getHost()); } @@ -199,7 +199,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn } public JmsConnectionInfo getConnectionInfo() { - return this.info; + return this.resource; } public Connection getProtonConnection() { @@ -211,11 +211,11 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn } public String getUsername() { - return this.info.getUsername(); + return this.resource.getUsername(); } public String getPassword() { - return this.info.getPassword(); + return this.resource.getPassword(); } public AmqpProvider getProvider() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 0af029b..3caec3c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory; /** * AMQP Consumer object that is used to manage JMS MessageConsumer semantics. */ -public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver> { +public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver> { private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class); @@ -81,20 +81,20 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver this.session = session; // Add a shortcut back to this Consumer for quicker lookups - this.info.getConsumerId().setProviderHint(this); + this.resource.getConsumerId().setProviderHint(this); } /** * Starts the consumer by setting the link credit to the given prefetch value. */ public void start(AsyncResult request) { - this.endpoint.flow(info.getPrefetchSize()); + this.endpoint.flow(resource.getPrefetchSize()); request.onSuccess(); } @Override protected void doOpen() { - JmsDestination destination = info.getDestination(); + JmsDestination destination = resource.getDestination(); String subscription = session.getQualifiedName(destination); Source source = new Source(); @@ -104,10 +104,10 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver configureSource(source); String receiverName = getConsumerId() + ":" + subscription; - if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) { + if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) { // In the case of Durable Topic Subscriptions the client must use the same // receiver name which is derived from the subscription name property. - receiverName = info.getSubscriptionName(); + receiverName = resource.getSubscriptionName(); } endpoint = session.getProtonSession().receiver(receiverName); @@ -136,7 +136,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver protected void configureSource(Source source) { Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>(); - if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) { + if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) { source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); source.setDurable(TerminusDurability.UNSETTLED_STATE); source.setDistributionMode(COPY); @@ -145,12 +145,12 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); } - if (info.isNoLocal()) { + if (resource.isNoLocal()) { filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL); } - if (info.getSelector() != null && !info.getSelector().trim().equals("")) { - filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(info.getSelector())); + if (resource.getSelector() != null && !resource.getSelector().trim().equals("")) { + filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resource.getSelector())); } if (!filters.isEmpty()) { @@ -168,7 +168,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver * would already have been given for these so we just need to settle them. */ public void acknowledge() { - LOG.trace("Session Acknowledge for consumer: {}", info.getConsumerId()); + LOG.trace("Session Acknowledge for consumer: {}", resource.getConsumerId()); for (Delivery delivery : delivered.values()) { delivery.disposition(Accepted.getInstance()); delivery.settle(); @@ -246,13 +246,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver * then we open the window back up to full prefetch size. */ private void sendFlowIfNeeded() { - if (info.getPrefetchSize() == 0) { + if (resource.getPrefetchSize() == 0) { return; } int currentCredit = endpoint.getCredit(); - if (currentCredit <= info.getPrefetchSize() * 0.2) { - endpoint.flow(info.getPrefetchSize() - currentCredit); + if (currentCredit <= resource.getPrefetchSize() * 0.2) { + endpoint.flow(resource.getPrefetchSize() - currentCredit); } } @@ -262,7 +262,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver * @throws Exception if an error occurs while performing the recover. */ public void recover() throws Exception { - LOG.debug("Session Recover for consumer: {}", info.getConsumerId()); + LOG.debug("Session Recover for consumer: {}", resource.getConsumerId()); for (Delivery delivery : delivered.values()) { // TODO - increment redelivery counter and apply connection redelivery policy // to those messages that are past max redlivery. @@ -281,7 +281,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver * @param timeout */ public void pull(long timeout) { - if (info.getPrefetchSize() == 0 && endpoint.getCredit() == 0) { + if (resource.getPrefetchSize() == 0 && endpoint.getCredit() == 0) { // expand the credit window by one. endpoint.flow(1); } @@ -329,7 +329,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber()); envelope.setMessage(message); - envelope.setConsumerId(info.getConsumerId()); + envelope.setConsumerId(resource.getConsumerId()); // Store link to delivery in the hint for use in acknowledge requests. envelope.setProviderHint(incoming); envelope.setMessageId(message.getFacade().getProviderMessageIdObject()); @@ -357,11 +357,11 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver } public JmsConsumerId getConsumerId() { - return this.info.getConsumerId(); + return this.resource.getConsumerId(); } public JmsDestination getDestination() { - return this.info.getDestination(); + return this.resource.getDestination(); } public Receiver getProtonReceiver() { @@ -382,7 +382,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver @Override public String toString() { - return "AmqpConsumer { " + this.info.getConsumerId() + " }"; + return "AmqpConsumer { " + this.resource.getConsumerId() + " }"; } protected void deliveryFailed(Delivery incoming, boolean expandCredit) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 3afc492..56ed79f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -235,8 +235,8 @@ public class AmqpFixedProducer extends AmqpProducer { protected void doOpen() { String targetAddress; - if (info.getDestination() != null) { - JmsDestination destination = info.getDestination(); + if (resource.getDestination() != null) { + JmsDestination destination = resource.getDestination(); targetAddress = session.getQualifiedName(destination); } else { targetAddress = connection.getProperties().getAnonymousRelayName(); @@ -274,7 +274,7 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public boolean isAnonymous() { - return this.info.getDestination() == null; + return this.resource.getDestination() == null; } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java index 81ebf9e..473f184 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java @@ -29,7 +29,7 @@ import org.apache.qpid.proton.engine.Sender; /** * Base class for Producer instances. */ -public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo, Sender> { +public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo, Sender> { protected final AmqpSession session; protected final AmqpConnection connection; @@ -41,7 +41,7 @@ public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo, this.connection = session.getConnection(); // Add a shortcut back to this Producer for quicker lookup. - this.info.getProducerId().setProviderHint(this); + this.resource.getProducerId().setProviderHint(this); } /** @@ -69,7 +69,7 @@ public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo, * @return the JmsProducerId that was assigned to this AmqpProducer. */ public JmsProducerId getProducerId() { - return this.info.getProducerId(); + return this.resource.getProducerId(); } /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java index 10e165b..6434bbd 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java @@ -46,7 +46,7 @@ public class AmqpQueueBrowser extends AmqpConsumer { */ @Override public void start(AsyncResult request) { - this.endpoint.flow(info.getPrefetchSize()); + this.endpoint.flow(resource.getPrefetchSize()); request.onSuccess(); } @@ -66,7 +66,7 @@ public class AmqpQueueBrowser extends AmqpConsumer { public void pull(long timeout) { if (!endpoint.getDrain() && endpoint.current() == null && endpoint.getUnsettled() == 0) { LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId()); - this.endpoint.drain(info.getPrefetchSize()); + this.endpoint.drain(resource.getPrefetchSize()); } else { endpoint.setDrain(false); } @@ -113,7 +113,7 @@ public class AmqpQueueBrowser extends AmqpConsumer { @Override protected void configureSource(Source source) { - if (info.isBrowser()) { + if (resource.isBrowser()) { source.setDistributionMode(COPY); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 10519f3..a671ffc 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -34,7 +34,7 @@ import org.apache.qpid.proton.engine.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> { +public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class); @@ -48,8 +48,8 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> { super(info, connection.getProtonConnection().session()); this.connection = connection; - this.info.getSessionId().setProviderHint(this); - if (this.info.isTransacted()) { + this.resource.getSessionId().setProviderHint(this); + if (this.resource.isTransacted()) { txContext = new AmqpTransactionContext(this); } else { txContext = null; @@ -165,7 +165,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> { * @throws Exception if an error occurs while performing the operation. */ public void begin(JmsTransactionId txId, AsyncResult request) throws Exception { - if (!this.info.isTransacted()) { + if (!this.resource.isTransacted()) { throw new IllegalStateException("Non-transacted Session cannot start a TX."); } @@ -181,7 +181,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> { * @throws Exception if an error occurs while performing the operation. */ public void commit(AsyncResult request) throws Exception { - if (!this.info.isTransacted()) { + if (!this.resource.isTransacted()) { throw new IllegalStateException("Non-transacted Session cannot start a TX."); } @@ -197,7 +197,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> { * @throws Exception if an error occurs while performing the operation. */ public void rollback(AsyncResult request) throws Exception { - if (!this.info.isTransacted()) { + if (!this.resource.isTransacted()) { throw new IllegalStateException("Non-transacted Session cannot start a TX."); } @@ -256,7 +256,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> { } public JmsSessionId getSessionId() { - return this.info.getSessionId(); + return this.resource.getSessionId(); } public Session getProtonSession() { @@ -264,11 +264,11 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> { } boolean isTransacted() { - return this.info.isTransacted(); + return this.resource.isTransacted(); } boolean isAsyncAck() { - return this.info.isSendAcksAsync() || isTransacted(); + return this.resource.isSendAcksAsync() || isTransacted(); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java index 4107e3e..1376874 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; * the broker in the case where the user does not have authorization to access temporary * destinations. */ -public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestination, Sender> { +public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestination, Sender> { private static final Logger LOG = LoggerFactory.getLogger(AmqpTemporaryDestination.class); @@ -58,10 +58,10 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio EndpointState remoteState = endpoint.getRemoteState(); if (remoteState == EndpointState.ACTIVE) { - LOG.trace("Temporary Destination: {} is now open", this.info); + LOG.trace("Temporary Destination: {} is now open", this.resource); opened(); } else if (remoteState == EndpointState.CLOSED) { - LOG.trace("Temporary Destination: {} is now closed", this.info); + LOG.trace("Temporary Destination: {} is now closed", this.resource); closed(); } } @@ -70,12 +70,12 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio public void opened() { // Once our producer is opened we can read the updated name from the target address. - String oldDestinationName = info.getName(); + String oldDestinationName = resource.getName(); String destinationName = this.endpoint.getRemoteTarget().getAddress(); - this.info.setName(destinationName); + this.resource.setName(destinationName); - LOG.trace("Updated temp destination to: {} from: {}", info, oldDestinationName); + LOG.trace("Updated temp destination to: {} from: {}", resource, oldDestinationName); super.opened(); } @@ -83,8 +83,8 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio @Override protected void doOpen() { - String sourceAddress = info.getName(); - if (info.isQueue()) { + String sourceAddress = resource.getName(); + if (resource.isQueue()) { sourceAddress = connection.getTempQueuePrefix() + sourceAddress; } else { // TODO - AMQ doesn't support temp topics so we make everything a temp queue for now @@ -123,11 +123,11 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio } public JmsDestination getJmsDestination() { - return this.info; + return this.resource; } @Override public String toString() { - return getClass().getSimpleName() + " { " + info + "}"; + return getClass().getSimpleName() + " { " + resource + "}"; } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java index 1cc6fd1..6bfbaa5 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java @@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory; * The Transaction will carry a JmsTransactionId while the Transaction is open, once a * transaction has been committed or rolled back the Transaction Id is cleared. */ -public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo, Sender> { +public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, Sender> { private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class); @@ -72,7 +72,7 @@ public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo, * * @param session * The session that owns this transaction - * @param info + * @param resource * The JmsTransactionInfo that defines this Transaction. */ public AmqpTransactionContext(AmqpSession session) { @@ -134,7 +134,7 @@ public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo, coordinator.setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_TXNS_PER_SSN); Source source = new Source(); - String coordinatorName = info.getSessionId().toString(); + String coordinatorName = resource.getSessionId().toString(); endpoint = session.getProtonSession().sender(coordinatorName); endpoint.setSource(source); endpoint.setTarget(coordinator); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org