encapsulate the endpoint field
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f2b3a5da Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f2b3a5da Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f2b3a5da Branch: refs/heads/master Commit: f2b3a5da8217e7dcd228ae8ee7ce32253b0780e4 Parents: da94884 Author: Robert Gemmell <[email protected]> Authored: Mon Nov 10 15:21:03 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Nov 10 17:48:37 2014 +0000 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpAbstractResource.java | 42 +++++++++++--------- .../qpid/jms/provider/amqp/AmqpConnection.java | 16 ++++---- .../provider/amqp/AmqpConnectionSession.java | 7 ++-- .../qpid/jms/provider/amqp/AmqpConsumer.java | 39 +++++++++--------- .../jms/provider/amqp/AmqpFixedProducer.java | 32 ++++++++------- .../jms/provider/amqp/AmqpQueueBrowser.java | 20 +++++----- .../qpid/jms/provider/amqp/AmqpSession.java | 4 +- .../provider/amqp/AmqpTemporaryDestination.java | 19 +++++---- .../provider/amqp/AmqpTransactionContext.java | 25 +++++++----- 9 files changed, 112 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 6def062..933683a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -45,7 +45,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp protected AsyncResult openRequest; protected AsyncResult closeRequest; - protected E endpoint; + private E endpoint; protected R resource; /** @@ -68,19 +68,19 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp */ public AmqpAbstractResource(R resource, E endpoint) { this.resource = resource; - this.endpoint = endpoint; + setEndpoint(endpoint); } @Override public void open(AsyncResult request) { this.openRequest = request; doOpen(); - this.endpoint.setContext(this); + getEndpoint().setContext(this); } @Override public boolean isOpen() { - return this.endpoint.getRemoteState() == EndpointState.ACTIVE; + return getEndpoint().getRemoteState() == EndpointState.ACTIVE; } @Override @@ -99,7 +99,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp @Override public void close(AsyncResult request) { // If already closed signal success or else the caller might never get notified. - if (endpoint.getLocalState() == EndpointState.CLOSED) { + if (getEndpoint().getLocalState() == EndpointState.CLOSED) { request.onSuccess(); return; } @@ -110,7 +110,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp @Override public boolean isClosed() { - return this.endpoint.getLocalState() == EndpointState.CLOSED; + return getEndpoint().getLocalState() == EndpointState.CLOSED; } @Override @@ -120,8 +120,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp @Override public void closed() { - this.endpoint.close(); - this.endpoint.free(); + getEndpoint().close(); + getEndpoint().free(); if (this.closeRequest != null) { this.closeRequest.onSuccess(); @@ -166,34 +166,38 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp return this.endpoint; } + public void setEndpoint(E endpoint) { + this.endpoint = endpoint; + } + public R getJmsResource() { return this.resource; } public EndpointState getLocalState() { - if (endpoint == null) { + if (getEndpoint() == null) { return EndpointState.UNINITIALIZED; } - return this.endpoint.getLocalState(); + return getEndpoint().getLocalState(); } public EndpointState getRemoteState() { - if (endpoint == null) { + if (getEndpoint() == null) { return EndpointState.UNINITIALIZED; } - return this.endpoint.getRemoteState(); + return getEndpoint().getRemoteState(); } @Override public boolean hasRemoteError() { - return endpoint.getRemoteCondition().getCondition() != null; + return getEndpoint().getRemoteCondition().getCondition() != null; } @Override public Exception getRemoteError() { String message = getRemoteErrorMessage(); Exception remoteError = null; - Symbol error = endpoint.getRemoteCondition().getCondition(); + Symbol error = getEndpoint().getRemoteCondition().getCondition(); if (error != null) { if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) { remoteError = new JMSSecurityException(message); @@ -208,8 +212,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp @Override public String getRemoteErrorMessage() { String message = "Received unkown error from remote peer"; - if (endpoint.getRemoteCondition() != null) { - ErrorCondition error = endpoint.getRemoteCondition(); + if (getEndpoint().getRemoteCondition() != null) { + ErrorCondition error = getEndpoint().getRemoteCondition(); if (error.getDescription() != null && !error.getDescription().isEmpty()) { message = error.getDescription(); } @@ -220,7 +224,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp @Override public void processStateChange() throws IOException { - EndpointState remoteState = endpoint.getRemoteState(); + EndpointState remoteState = getEndpoint().getRemoteState(); if (remoteState == EndpointState.ACTIVE) { if (isAwaitingOpen()) { @@ -258,7 +262,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp * updates. */ protected void doOpen() { - endpoint.open(); + getEndpoint().open(); } /** @@ -267,6 +271,6 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp * standard close path such as endpoint detach etc. */ protected void doClose() { - endpoint.close(); + getEndpoint().close(); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index 8d5a458..c500d74 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -94,8 +94,8 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn @Override protected void doOpen() { - this.endpoint.setContainer(resource.getClientId()); - this.endpoint.setHostname(remoteURI.getHost()); + getEndpoint().setContainer(resource.getClientId()); + getEndpoint().setHostname(remoteURI.getHost()); super.doOpen(); } @@ -132,7 +132,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn connected = true; this.properties = new AmqpConnectionProperties( - endpoint.getRemoteOfferedCapabilities(), endpoint.getRemoteProperties()); + getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties()); connectionSession.open(new AsyncResult() { @@ -155,14 +155,14 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn }); } - EndpointState localState = endpoint.getLocalState(); - EndpointState remoteState = endpoint.getRemoteState(); + EndpointState localState = getEndpoint().getLocalState(); + EndpointState remoteState = getEndpoint().getRemoteState(); // We are still active (connected or not) and something on the remote end has // closed us, signal an error if one was sent. if (localState == EndpointState.ACTIVE && remoteState != EndpointState.ACTIVE) { - if (endpoint.getRemoteCondition().getCondition() != null) { - LOG.info("Error condition detected on Connection open {}.", endpoint.getRemoteCondition().getCondition()); + if (getEndpoint().getRemoteCondition().getCondition() != null) { + LOG.info("Error condition detected on Connection open {}.", getEndpoint().getRemoteCondition().getCondition()); Exception remoteError = getRemoteError(); if (isAwaitingOpen()) { openRequest.onFailure(remoteError); @@ -214,7 +214,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn } public Connection getProtonConnection() { - return this.endpoint; + return this.getEndpoint(); } public URI getRemoteURI() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java index 3cfc0c6..e08d1b9 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java @@ -81,9 +81,10 @@ public class AmqpConnectionSession extends AmqpSession { @Override protected void doOpen() { - endpoint.setTarget(new Target()); - endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); - endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + Receiver receiver = getEndpoint(); + receiver.setTarget(new Target()); + receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); super.doOpen(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index cb766bf..1a9be73 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -88,7 +88,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver * Starts the consumer by setting the link credit to the given prefetch value. */ public void start(AsyncResult request) { - this.endpoint.flow(resource.getPrefetchSize()); + getEndpoint().flow(resource.getPrefetchSize()); request.onSuccess(); } @@ -110,15 +110,18 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver receiverName = resource.getSubscriptionName(); } - endpoint = session.getProtonSession().receiver(receiverName); - endpoint.setSource(source); - endpoint.setTarget(target); + Receiver receiver = session.getProtonSession().receiver(receiverName); + receiver.setSource(source); + receiver.setTarget(target); if (isPresettle()) { - endpoint.setSenderSettleMode(SenderSettleMode.SETTLED); + receiver.setSenderSettleMode(SenderSettleMode.SETTLED); } else { - endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); + receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); } - endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(receiver); + super.doOpen(); } @@ -251,9 +254,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver return; } - int currentCredit = endpoint.getCredit(); + int currentCredit = getEndpoint().getCredit(); if (currentCredit <= resource.getPrefetchSize() * 0.2) { - endpoint.flow(resource.getPrefetchSize() - currentCredit); + getEndpoint().flow(resource.getPrefetchSize() - currentCredit); } } @@ -282,9 +285,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver * @param timeout */ public void pull(long timeout) { - if (resource.getPrefetchSize() == 0 && endpoint.getCredit() == 0) { + if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) { // expand the credit window by one. - endpoint.flow(1); + getEndpoint().flow(1); } } @@ -292,7 +295,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver public void processDeliveryUpdates() throws IOException { Delivery incoming = null; do { - incoming = endpoint.current(); + incoming = getEndpoint().current(); if (incoming != null && incoming.isReadable() && !incoming.isPartial()) { LOG.trace("{} has incoming Message(s).", this); try { @@ -300,7 +303,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } catch (Exception e) { throw IOExceptionSupport.create(e); } - endpoint.advance(); + getEndpoint().advance(); } else { LOG.trace("{} has a partial incoming Message(s), deferring.", this); incoming = null; @@ -348,9 +351,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver @Override protected void doClose() { if (resource.isDurable()) { - this.endpoint.detach(); + getEndpoint().detach(); } else { - this.endpoint.close(); + getEndpoint().close(); } } @@ -371,7 +374,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } public Receiver getProtonReceiver() { - return this.endpoint; + return this.getEndpoint(); } public boolean isBrowser() { @@ -398,7 +401,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver incoming.disposition(disposition); incoming.settle(); if (expandCredit) { - endpoint.flow(1); + getEndpoint().flow(1); } } @@ -420,7 +423,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver protected Message decodeIncomingMessage(Delivery incoming) { int count; - while ((count = endpoint.recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) { + while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) { incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count); if (!incomingBuffer.isWritable()) { incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5)); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 7eb6683..8cc5a74 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -86,7 +86,7 @@ public class AmqpFixedProducer extends AmqpProducer { // TODO - Handle the case where remote has no credit which means we can't send to it. // We need to hold the send until remote credit becomes available but we should // also have a send timeout option and filter timed out sends. - if (endpoint.getCredit() <= 0) { + if (getEndpoint().getCredit() <= 0) { LOG.trace("Holding Message send until credit is available."); // Once a message goes into a held mode we no longer can send it async, so // we clear the async flag if set to avoid the sender never getting notified. @@ -108,9 +108,9 @@ public class AmqpFixedProducer extends AmqpProducer { Delivery delivery = null; if (presettle) { - delivery = endpoint.delivery(EMPTY_BYTE_ARRAY, 0, 0); + delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0); } else { - delivery = endpoint.delivery(tag, 0, tag.length); + delivery = getEndpoint().delivery(tag, 0, tag.length); } delivery.setContext(request); @@ -129,7 +129,7 @@ public class AmqpFixedProducer extends AmqpProducer { delivery.settle(); } else { pending.add(delivery); - endpoint.advance(); + getEndpoint().advance(); } if (envelope.isSendAsync() || presettle) { @@ -152,7 +152,7 @@ public class AmqpFixedProducer extends AmqpProducer { int sentSoFar = 0; while (true) { - int sent = endpoint.send(encodeBuffer, sentSoFar, encodedSize - sentSoFar); + int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar); if (sent > 0) { sentSoFar += sent; if ((encodedSize - sentSoFar) == 0) { @@ -166,8 +166,8 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public void processFlowUpdates() throws IOException { - if (!pendingSends.isEmpty() && endpoint.getCredit() > 0) { - while (endpoint.getCredit() > 0 && !pendingSends.isEmpty()) { + if (!pendingSends.isEmpty() && getEndpoint().getCredit() > 0) { + while (getEndpoint().getCredit() > 0 && !pendingSends.isEmpty()) { LOG.trace("Dispatching previously held send"); PendingSend held = pendingSends.pop(); try { @@ -248,15 +248,19 @@ public class AmqpFixedProducer extends AmqpProducer { target.setAddress(targetAddress); String senderName = sourceAddress + ":" + targetAddress; - endpoint = session.getProtonSession().sender(senderName); - endpoint.setSource(source); - endpoint.setTarget(target); + + Sender sender = session.getProtonSession().sender(senderName); + sender.setSource(source); + sender.setTarget(target); if (presettle) { - endpoint.setSenderSettleMode(SenderSettleMode.SETTLED); + sender.setSenderSettleMode(SenderSettleMode.SETTLED); } else { - endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); } - endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(sender); + super.doOpen(); } @@ -265,7 +269,7 @@ public class AmqpFixedProducer extends AmqpProducer { } public Sender getProtonSender() { - return this.endpoint; + return this.getEndpoint(); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java index 6434bbd..5daeb02 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java @@ -46,7 +46,7 @@ public class AmqpQueueBrowser extends AmqpConsumer { */ @Override public void start(AsyncResult request) { - this.endpoint.flow(resource.getPrefetchSize()); + getEndpoint().flow(resource.getPrefetchSize()); request.onSuccess(); } @@ -64,17 +64,17 @@ public class AmqpQueueBrowser extends AmqpConsumer { */ @Override public void pull(long timeout) { - if (!endpoint.getDrain() && endpoint.current() == null && endpoint.getUnsettled() == 0) { + if (!getEndpoint().getDrain() && getEndpoint().current() == null && getEndpoint().getUnsettled() == 0) { LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId()); - this.endpoint.drain(resource.getPrefetchSize()); + getEndpoint().drain(resource.getPrefetchSize()); } else { - endpoint.setDrain(false); + getEndpoint().setDrain(false); } } @Override public void processFlowUpdates() throws IOException { - if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit()) { + if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) { JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber()); browseDone.setConsumerId(getConsumerId()); try { @@ -83,7 +83,7 @@ public class AmqpQueueBrowser extends AmqpConsumer { throw IOExceptionSupport.create(e); } } else { - endpoint.setDrain(false); + getEndpoint().setDrain(false); } super.processFlowUpdates(); @@ -91,14 +91,14 @@ public class AmqpQueueBrowser extends AmqpConsumer { @Override public void processDeliveryUpdates() throws IOException { - if (endpoint.getDrain() && endpoint.current() != null) { + if (getEndpoint().getDrain() && getEndpoint().current() != null) { LOG.trace("{} incoming delivery, cancel drain.", getConsumerId()); - endpoint.setDrain(false); + getEndpoint().setDrain(false); } super.processDeliveryUpdates(); - if (endpoint.getDrain() && endpoint.getCredit() == endpoint.getRemoteCredit()) { + if (getEndpoint().getDrain() && getEndpoint().getCredit() == getEndpoint().getRemoteCredit()) { JmsInboundMessageDispatch browseDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber()); browseDone.setConsumerId(getConsumerId()); try { @@ -107,7 +107,7 @@ public class AmqpQueueBrowser extends AmqpConsumer { throw IOExceptionSupport.create(e); } } else { - endpoint.setDrain(false); + getEndpoint().setDrain(false); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 5f7ad07..3ffd792 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -67,7 +67,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { @Override protected void doOpen() { - this.endpoint.setIncomingCapacity(Integer.MAX_VALUE); + this.getEndpoint().setIncomingCapacity(Integer.MAX_VALUE); this.connection.addSession(this); super.doOpen(); } @@ -292,7 +292,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { } public Session getProtonSession() { - return this.endpoint; + return this.getEndpoint(); } boolean isTransacted() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java index 2cf1cb9..fca448d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java @@ -58,7 +58,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio // TODO - We might want to check on our producer to see if it becomes closed // which might indicate that the broker purged the temporary destination. - EndpointState remoteState = endpoint.getRemoteState(); + EndpointState remoteState = getEndpoint().getRemoteState(); if (remoteState == EndpointState.ACTIVE) { LOG.trace("Temporary Destination: {} is now open", this.resource); opened(); @@ -73,7 +73,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio // Once our producer is opened we can read the updated name from the target address. String oldDestinationName = resource.getName(); - String destinationName = this.endpoint.getRemoteTarget().getAddress(); + String destinationName = getEndpoint().getRemoteTarget().getAddress(); this.resource.setName(destinationName); @@ -100,11 +100,14 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); String senderName = sourceAddress; - endpoint = session.getProtonSession().sender(senderName); - endpoint.setSource(source); - endpoint.setTarget(target); - endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); - endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + Sender sender = session.getProtonSession().sender(senderName); + sender.setSource(source); + sender.setTarget(target); + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(sender); this.connection.addTemporaryDestination(this); @@ -127,7 +130,7 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestinatio } public Sender getProtonSender() { - return this.endpoint; + return getEndpoint(); } public JmsDestination getJmsDestination() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2b3a5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java index 5492976..a138c97 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java @@ -135,11 +135,15 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, Source source = new Source(); String coordinatorName = resource.getSessionId().toString(); - endpoint = session.getProtonSession().sender(coordinatorName); - endpoint.setSource(source); - endpoint.setTarget(coordinator); - endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); - endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + Sender sender = session.getProtonSession().sender(coordinatorName); + sender.setSource(source); + sender.setTarget(coordinator); + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(sender); + super.doOpen(); } @@ -152,7 +156,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, Declare declare = new Declare(); message.setBody(new AmqpValue(declare)); - pendingDelivery = endpoint.delivery(tagGenerator.getNextTag()); + pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag()); pendingRequest = request; current = txId; @@ -172,7 +176,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, discharge.setTxnId((Binary) current.getProviderHint()); message.setBody(new AmqpValue(discharge)); - pendingDelivery = endpoint.delivery(tagGenerator.getNextTag()); + pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag()); pendingDelivery.setContext(COMMIT_MARKER); pendingRequest = request; @@ -192,7 +196,7 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, discharge.setTxnId((Binary) current.getProviderHint()); message.setBody(new AmqpValue(discharge)); - pendingDelivery = endpoint.delivery(tagGenerator.getNextTag()); + pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag()); pendingDelivery.setContext(ROLLBACK_MARKER); pendingRequest = request; @@ -260,7 +264,8 @@ public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, } } - this.endpoint.send(buffer, 0, encodedSize); - this.endpoint.advance(); + Sender sender = getEndpoint(); + sender.send(buffer, 0, encodedSize); + sender.advance(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
