http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java new file mode 100644 index 0000000..320d174 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.util.HashMap; +import java.util.Map; +import java.util.NoSuchElementException; + +import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.message.Message; + +public class AmqpMessage { + + private final AmqpReceiver receiver; + private final Message message; + private final Delivery delivery; + + private Map<Symbol, Object> deliveryAnnotationsMap; + private Map<Symbol, Object> messageAnnotationsMap; + private Map<String, Object> applicationPropertiesMap; + + /** + * Creates a new AmqpMessage that wraps the information necessary to handle + * an outgoing message. + */ + public AmqpMessage() { + receiver = null; + delivery = null; + + message = Proton.message(); + } + + /** + * Creates a new AmqpMessage that wraps the information necessary to handle + * an outgoing message. + * + * @param message the Proton message that is to be sent. + */ + public AmqpMessage(Message message) { + this(null, message, null); + } + + /** + * Creates a new AmqpMessage that wraps the information necessary to handle + * an incoming delivery. + * + * @param receiver the AmqpReceiver that received this message. + * @param message the Proton message that was received. + * @param delivery the Delivery instance that produced this message. + */ + @SuppressWarnings("unchecked") + public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) { + this.receiver = receiver; + this.message = message; + this.delivery = delivery; + + if (message.getMessageAnnotations() != null) { + messageAnnotationsMap = message.getMessageAnnotations().getValue(); + } + + if (message.getApplicationProperties() != null) { + applicationPropertiesMap = message.getApplicationProperties().getValue(); + } + + if (message.getDeliveryAnnotations() != null) { + deliveryAnnotationsMap = message.getDeliveryAnnotations().getValue(); + } + } + + //----- Access to interal client resources -------------------------------// + + /** + * @return the AMQP Delivery object linked to a received message. + */ + public Delivery getWrappedDelivery() { + if (delivery != null) { + return new UnmodifiableDelivery(delivery); + } + + return null; + } + + /** + * @return the AMQP Message that is wrapped by this object. + */ + public Message getWrappedMessage() { + return message; + } + + /** + * @return the AmqpReceiver that consumed this message. + */ + public AmqpReceiver getAmqpReceiver() { + return receiver; + } + + //----- Message disposition control --------------------------------------// + + /** + * Accepts the message marking it as consumed on the remote peer. + * + * @throws Exception if an error occurs during the accept. + */ + public void accept() throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't accept non-received message."); + } + + receiver.accept(delivery); + } + + /** + * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here. + * + * @param deliveryFailed indicates that the delivery failed for some reason. + * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to. + * @throws Exception if an error occurs during the process. + */ + public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't modify non-received message."); + } + + receiver.modified(delivery, deliveryFailed, undeliverableHere); + } + + /** + * Release the message, remote can redeliver it elsewhere. + * + * @throws Exception if an error occurs during the reject. + */ + public void release() throws Exception { + if (receiver == null) { + throw new IllegalStateException("Can't release non-received message."); + } + + receiver.release(delivery); + } + + //----- Convenience methods for constructing outbound messages -----------// + + /** + * Sets the MessageId property on an outbound message using the provided String + * + * @param messageId the String message ID value to set. + */ + public void setMessageId(String messageId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setMessageId(messageId); + } + + /** + * Return the set MessageId value in String form, if there are no properties + * in the given message return null. + * + * @return the set message ID in String form or null if not set. + */ + public String getMessageId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getMessageId().toString(); + } + + /** + * Return the set MessageId value in the original form, if there are no properties + * in the given message return null. + * + * @return the set message ID in its original form or null if not set. + */ + public Object getRawMessageId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getMessageId(); + } + + /** + * Sets the MessageId property on an outbound message using the provided value + * + * @param messageId the message ID value to set. + */ + public void setRawMessageId(Object messageId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setMessageId(messageId); + } + + /** + * Sets the CorrelationId property on an outbound message using the provided String + * + * @param correlationId the String Correlation ID value to set. + */ + public void setCorrelationId(String correlationId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setCorrelationId(correlationId); + } + + /** + * Return the set CorrelationId value in String form, if there are no properties + * in the given message return null. + * + * @return the set correlation ID in String form or null if not set. + */ + public String getCorrelationId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getCorrelationId().toString(); + } + + /** + * Return the set CorrelationId value in the original form, if there are no properties + * in the given message return null. + * + * @return the set message ID in its original form or null if not set. + */ + public Object getRawCorrelationId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getCorrelationId(); + } + + /** + * Sets the CorrelationId property on an outbound message using the provided value + * + * @param correlationId the correlation ID value to set. + */ + public void setRawCorrelationId(Object correlationId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setCorrelationId(correlationId); + } + + /** + * Sets the GroupId property on an outbound message using the provided String + * + * @param groupId the String Group ID value to set. + */ + public void setGroupId(String groupId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setGroupId(groupId); + } + + /** + * Return the set GroupId value in String form, if there are no properties + * in the given message return null. + * + * @return the set GroupID in String form or null if not set. + */ + public String getGroupId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getGroupId(); + } + + /** + * Sets the durable header on the outgoing message. + * + * @param durable the boolean durable value to set. + */ + public void setDurable(boolean durable) { + checkReadOnly(); + lazyCreateHeader(); + getWrappedMessage().setDurable(durable); + } + + /** + * Checks the durable value in the Message Headers to determine if + * the message was sent as a durable Message. + * + * @return true if the message is marked as being durable. + */ + public boolean isDurable() { + if (message.getHeader() == null) { + return false; + } + + return message.getHeader().getDurable(); + } + + /** + * Sets a given application property on an outbound message. + * + * @param key the name to assign the new property. + * @param value the value to set for the named property. + */ + public void setApplicationProperty(String key, Object value) { + checkReadOnly(); + lazyCreateApplicationProperties(); + applicationPropertiesMap.put(key, value); + } + + /** + * Gets the application property that is mapped to the given name or null + * if no property has been set with that name. + * + * @param key the name used to lookup the property in the application properties. + * @return the propety value or null if not set. + */ + public Object getApplicationProperty(String key) { + if (applicationPropertiesMap == null) { + return null; + } + + return applicationPropertiesMap.get(key); + } + + /** + * Perform a proper annotation set on the AMQP Message based on a Symbol key and + * the target value to append to the current annotations. + * + * @param key The name of the Symbol whose value is being set. + * @param value The new value to set in the annotations of this message. + */ + public void setMessageAnnotation(String key, Object value) { + checkReadOnly(); + lazyCreateMessageAnnotations(); + messageAnnotationsMap.put(Symbol.valueOf(key), value); + } + + /** + * Given a message annotation name, lookup and return the value associated with + * that annotation name. If the message annotations have not been created yet + * then this method will always return null. + * + * @param key the Symbol name that should be looked up in the message annotations. + * @return the value of the annotation if it exists, or null if not set or not accessible. + */ + public Object getMessageAnnotation(String key) { + if (messageAnnotationsMap == null) { + return null; + } + + return messageAnnotationsMap.get(Symbol.valueOf(key)); + } + + /** + * Perform a proper delivery annotation set on the AMQP Message based on a Symbol + * key and the target value to append to the current delivery annotations. + * + * @param key The name of the Symbol whose value is being set. + * @param value The new value to set in the delivery annotations of this message. + */ + public void setDeliveryAnnotation(String key, Object value) { + checkReadOnly(); + lazyCreateDeliveryAnnotations(); + deliveryAnnotationsMap.put(Symbol.valueOf(key), value); + } + + /** + * Given a message annotation name, lookup and return the value associated with + * that annotation name. If the message annotations have not been created yet + * then this method will always return null. + * + * @param key the Symbol name that should be looked up in the message annotations. + * @return the value of the annotation if it exists, or null if not set or not accessible. + */ + public Object getDeliveryAnnotation(String key) { + if (deliveryAnnotationsMap == null) { + return null; + } + + return deliveryAnnotationsMap.get(Symbol.valueOf(key)); + } + + //----- Methods for manipulating the Message body ------------------------// + + /** + * Sets a String value into the body of an outgoing Message, throws + * an exception if this is an incoming message instance. + * + * @param value the String value to store in the Message body. + * @throws IllegalStateException if the message is read only. + */ + public void setText(String value) throws IllegalStateException { + checkReadOnly(); + AmqpValue body = new AmqpValue(value); + getWrappedMessage().setBody(body); + } + + /** + * Sets a byte array value into the body of an outgoing Message, throws + * an exception if this is an incoming message instance. + * + * @param bytes the byte array value to store in the Message body. + * @throws IllegalStateException if the message is read only. + */ + public void setBytes(byte[] bytes) throws IllegalStateException { + checkReadOnly(); + Data body = new Data(new Binary(bytes)); + getWrappedMessage().setBody(body); + } + + /** + * Sets a byte array value into the body of an outgoing Message, throws + * an exception if this is an incoming message instance. + * + * @param described the byte array value to store in the Message body. + * @throws IllegalStateException if the message is read only. + */ + public void setDescribedType(DescribedType described) throws IllegalStateException { + checkReadOnly(); + AmqpValue body = new AmqpValue(described); + getWrappedMessage().setBody(body); + } + + /** + * Attempts to retrieve the message body as an DescribedType instance. + * + * @return an DescribedType instance if one is stored in the message body. + * @throws NoSuchElementException if the body does not contain a DescribedType. + */ + public DescribedType getDescribedType() throws NoSuchElementException { + DescribedType result = null; + + if (getWrappedMessage().getBody() == null) { + return null; + } + else { + if (getWrappedMessage().getBody() instanceof AmqpValue) { + AmqpValue value = (AmqpValue) getWrappedMessage().getBody(); + + if (value.getValue() == null) { + result = null; + } + else if (value.getValue() instanceof DescribedType) { + result = (DescribedType) value.getValue(); + } + else { + throw new NoSuchElementException("Message does not contain a DescribedType body"); + } + } + } + + return result; + } + + //----- Internal implementation ------------------------------------------// + + private void checkReadOnly() throws IllegalStateException { + if (delivery != null) { + throw new IllegalStateException("Message is read only."); + } + } + + private void lazyCreateMessageAnnotations() { + if (messageAnnotationsMap == null) { + messageAnnotationsMap = new HashMap<>(); + message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap)); + } + } + + private void lazyCreateDeliveryAnnotations() { + if (deliveryAnnotationsMap == null) { + deliveryAnnotationsMap = new HashMap<>(); + message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap)); + } + } + + private void lazyCreateApplicationProperties() { + if (applicationPropertiesMap == null) { + applicationPropertiesMap = new HashMap<>(); + message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap)); + } + } + + private void lazyCreateHeader() { + if (message.getHeader() == null) { + message.setHeader(new Header()); + } + } + + private void lazyCreateProperties() { + if (message.getProperties() == null) { + message.setProperties(new Properties()); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java new file mode 100644 index 0000000..2e36e84 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpNoLocalFilter.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.DescribedType; + +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_CODE; + +/** + * A Described Type wrapper for JMS no local option for MessageConsumer. + */ +public class AmqpNoLocalFilter implements DescribedType { + + public static final AmqpNoLocalFilter NO_LOCAL = new AmqpNoLocalFilter(); + + private final String noLocal; + + public AmqpNoLocalFilter() { + this.noLocal = "NoLocalFilter{}"; + } + + @Override + public Object getDescriptor() { + return NO_LOCAL_CODE; + } + + @Override + public Object getDescribed() { + return this.noLocal; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java new file mode 100644 index 0000000..9f3bff2 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -0,0 +1,946 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import javax.jms.InvalidDestinationException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver; +import org.apache.qpid.jms.JmsOperationTimedOutException; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; + +/** + * Receiver class that manages a Proton receiver endpoint. + */ +public class AmqpReceiver extends AmqpAbstractResource<Receiver> { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class); + + private final AtomicBoolean closed = new AtomicBoolean(); + private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<>(); + + private final AmqpSession session; + private final String address; + private final String receiverId; + private final Source userSpecifiedSource; + + private String subscriptionName; + private String selector; + private boolean presettle; + private boolean noLocal; + + private AsyncResult pullRequest; + private AsyncResult stopRequest; + + /** + * Create a new receiver instance. + * + * @param session The parent session that created the receiver. + * @param address The address that this receiver should listen on. + * @param receiverId The unique ID assigned to this receiver. + */ + public AmqpReceiver(AmqpSession session, String address, String receiverId) { + + if (address != null && address.isEmpty()) { + throw new IllegalArgumentException("Address cannot be empty."); + } + + this.userSpecifiedSource = null; + this.session = session; + this.address = address; + this.receiverId = receiverId; + } + + /** + * Create a new receiver instance. + * + * @param session The parent session that created the receiver. + * @param source The Source instance to use instead of creating and configuring one. + * @param receiverId The unique ID assigned to this receiver. + */ + public AmqpReceiver(AmqpSession session, Source source, String receiverId) { + + if (source == null) { + throw new IllegalArgumentException("User specified Source cannot be null"); + } + + this.session = session; + this.userSpecifiedSource = source; + this.address = source.getAddress(); + this.receiverId = receiverId; + } + + /** + * Close the receiver, a closed receiver will throw exceptions if any further send + * calls are made. + * + * @throws IOException if an error occurs while closing the receiver. + */ + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + close(request); + session.pumpToProtonTransport(request); + } + }); + + request.sync(); + } + } + + /** + * Detach the receiver, a closed receiver will throw exceptions if any further send + * calls are made. + * + * @throws IOException if an error occurs while closing the receiver. + */ + public void detach() throws IOException { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + detach(request); + session.pumpToProtonTransport(request); + } + }); + + request.sync(); + } + } + + /** + * @return this session's parent AmqpSession. + */ + public AmqpSession getSession() { + return session; + } + + /** + * @return the address that this receiver has been configured to listen on. + */ + public String getAddress() { + return address; + } + + /** + * Attempts to wait on a message to be delivered to this receiver. The receive + * call will wait indefinitely for a message to be delivered. + * + * @return a newly received message sent to this receiver. + * @throws Exception if an error occurs during the receive attempt. + */ + public AmqpMessage receive() throws Exception { + checkClosed(); + return prefetch.take(); + } + + /** + * Attempts to receive a message sent to this receiver, waiting for the given + * timeout value before giving up and returning null. + * + * @param timeout the time to wait for a new message to arrive. + * @param unit the unit of time that the timeout value represents. + * @return a newly received message or null if the time to wait period expires. + * @throws Exception if an error occurs during the receive attempt. + */ + public AmqpMessage receive(long timeout, TimeUnit unit) throws Exception { + checkClosed(); + return prefetch.poll(timeout, unit); + } + + /** + * If a message is already available in this receiver's prefetch buffer then + * it is returned immediately otherwise this methods return null without waiting. + * + * @return a newly received message or null if there is no currently available message. + * @throws Exception if an error occurs during the receive attempt. + */ + public AmqpMessage receiveNoWait() throws Exception { + checkClosed(); + return prefetch.poll(); + } + + /** + * Request a remote peer send a Message to this client waiting until one arrives. + * + * @return the pulled AmqpMessage or null if none was pulled from the remote. + * @throws IOException if an error occurs + */ + public AmqpMessage pull() throws IOException { + return pull(-1, TimeUnit.MILLISECONDS); + } + + /** + * Request a remote peer send a Message to this client using an immediate drain request. + * + * @return the pulled AmqpMessage or null if none was pulled from the remote. + * @throws IOException if an error occurs + */ + public AmqpMessage pullImmediate() throws IOException { + return pull(0, TimeUnit.MILLISECONDS); + } + + /** + * Request a remote peer send a Message to this client. + * + * {@literal timeout < 0} then it should remain open until a message is received. + * {@literal timeout = 0} then it returns a message or null if none available + * {@literal timeout > 0} then it should remain open for timeout amount of time. + * + * The timeout value when positive is given in milliseconds. + * + * @param timeout the amount of time to tell the remote peer to keep this pull request valid. + * @param unit the unit of measure that the timeout represents. + * @return the pulled AmqpMessage or null if none was pulled from the remote. + * @throws IOException if an error occurs + */ + public AmqpMessage pull(final long timeout, final TimeUnit unit) throws IOException { + checkClosed(); + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + + long timeoutMills = unit.toMillis(timeout); + + try { + LOG.trace("Pull on Receiver {} with timeout = {}", getSubscriptionName(), timeoutMills); + if (timeoutMills < 0) { + // Wait until message arrives. Just give credit if needed. + if (getEndpoint().getCredit() == 0) { + LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName()); + getEndpoint().flow(1); + } + + // Await the message arrival + pullRequest = request; + } + else if (timeoutMills == 0) { + // If we have no credit then we need to issue some so that we can + // try to fulfill the request, then drain down what is there to + // ensure we consume what is available and remove all credit. + if (getEndpoint().getCredit() == 0) { + LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName()); + getEndpoint().flow(1); + } + + // Drain immediately and wait for the message(s) to arrive, + // or a flow indicating removal of the remaining credit. + stop(request); + } + else if (timeoutMills > 0) { + // If we have no credit then we need to issue some so that we can + // try to fulfill the request, then drain down what is there to + // ensure we consume what is available and remove all credit. + if (getEndpoint().getCredit() == 0) { + LOG.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName()); + getEndpoint().flow(1); + } + + // Wait for the timeout for the message(s) to arrive, then drain if required + // and wait for remaining message(s) to arrive or a flow indicating + // removal of the remaining credit. + stopOnSchedule(timeoutMills, request); + } + + session.pumpToProtonTransport(request); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + + return prefetch.poll(); + } + + /** + * Controls the amount of credit given to the receiver link. + * + * @param credit the amount of credit to grant. + * @throws IOException if an error occurs while sending the flow. + */ + public void flow(final int credit) throws IOException { + checkClosed(); + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + getEndpoint().flow(credit); + session.pumpToProtonTransport(request); + request.onSuccess(); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Attempts to drain a given amount of credit from the link. + * + * @param credit the amount of credit to drain. + * @throws IOException if an error occurs while sending the drain. + */ + public void drain(final int credit) throws IOException { + checkClosed(); + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + getEndpoint().drain(credit); + session.pumpToProtonTransport(request); + request.onSuccess(); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Stops the receiver, using all link credit and waiting for in-flight messages to arrive. + * + * @throws IOException if an error occurs while sending the drain. + */ + public void stop() throws IOException { + checkClosed(); + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + stop(request); + session.pumpToProtonTransport(request); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance. + * + * @param delivery the Delivery instance to accept. + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(final Delivery delivery) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to accept cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + if (session.isInTransaction()) { + Binary txnId = session.getTransactionId().getRemoteTxId(); + if (txnId != null) { + TransactionalState txState = new TransactionalState(); + txState.setOutcome(Accepted.getInstance()); + txState.setTxnId(txnId); + delivery.disposition(txState); + delivery.settle(); + session.getTransactionContext().registerTxConsumer(AmqpReceiver.this); + } + } + else { + delivery.disposition(Accepted.getInstance()); + delivery.settle(); + } + } + session.pumpToProtonTransport(request); + request.onSuccess(); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Mark a message that was dispatched under the given Delivery instance as Modified. + * + * @param delivery the Delivery instance to mark modified. + * @param deliveryFailed indicates that the delivery failed for some reason. + * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to. + * @throws IOException if an error occurs while sending the reject. + */ + public void modified(final Delivery delivery, + final Boolean deliveryFailed, + final Boolean undeliverableHere) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to reject cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + Modified disposition = new Modified(); + disposition.setUndeliverableHere(undeliverableHere); + disposition.setDeliveryFailed(deliveryFailed); + delivery.disposition(disposition); + delivery.settle(); + session.pumpToProtonTransport(request); + } + request.onSuccess(); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Release a message that was dispatched under the given Delivery instance. + * + * @param delivery the Delivery instance to release. + * @throws IOException if an error occurs while sending the release. + */ + public void release(final Delivery delivery) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to release cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + delivery.disposition(Released.getInstance()); + delivery.settle(); + session.pumpToProtonTransport(request); + } + request.onSuccess(); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * @return an unmodifiable view of the underlying Receiver instance. + */ + public Receiver getReceiver() { + return new UnmodifiableReceiver(getEndpoint()); + } + + //----- Receiver configuration properties --------------------------------// + + public boolean isPresettle() { + return presettle; + } + + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } + + public boolean isDurable() { + return subscriptionName != null; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public String getSelector() { + return selector; + } + + public void setSelector(String selector) { + this.selector = selector; + } + + public boolean isNoLocal() { + return noLocal; + } + + public void setNoLocal(boolean noLocal) { + this.noLocal = noLocal; + } + + public long getDrainTimeout() { + return session.getConnection().getDrainTimeout(); + } + + //----- Internal implementation ------------------------------------------// + + @Override + protected void doOpen() { + + Source source = userSpecifiedSource; + Target target = new Target(); + + if (source == null && address != null) { + source = new Source(); + source.setAddress(address); + configureSource(source); + } + + String receiverName = receiverId + ":" + address; + + if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) { + // In the case of Durable Topic Subscriptions the client must use the same + // receiver name which is derived from the subscription name property. + receiverName = getSubscriptionName(); + } + + Receiver receiver = session.getEndpoint().receiver(receiverName); + receiver.setSource(source); + receiver.setTarget(target); + if (isPresettle()) { + receiver.setSenderSettleMode(SenderSettleMode.SETTLED); + } + else { + receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(receiver); + + super.doOpen(); + } + + @Override + protected void doOpenCompletion() { + // Verify the attach response contained a non-null Source + org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource(); + if (s != null) { + super.doOpenCompletion(); + } + else { + // No link terminus was created, the peer will now detach/close us. + } + } + + @Override + protected void doClose() { + getEndpoint().close(); + } + + @Override + protected void doDetach() { + getEndpoint().detach(); + } + + @Override + protected Exception getOpenAbortException() { + // Verify the attach response contained a non-null Source + org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource(); + if (s != null) { + return super.getOpenAbortException(); + } + else { + // No link terminus was created, the peer has detach/closed us, create IDE. + return new InvalidDestinationException("Link creation was refused"); + } + } + + @Override + protected void doOpenInspection() { + try { + getStateInspector().inspectOpenedResource(getReceiver()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + @Override + protected void doClosedInspection() { + try { + getStateInspector().inspectClosedResource(getReceiver()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + @Override + protected void doDetachedInspection() { + try { + getStateInspector().inspectDetachedResource(getReceiver()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + protected void configureSource(Source source) { + Map<Symbol, DescribedType> filters = new HashMap<>(); + Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL}; + + if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) { + source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + source.setDurable(TerminusDurability.UNSETTLED_STATE); + source.setDistributionMode(COPY); + } + else { + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + } + + source.setOutcomes(outcomes); + + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + modified.setUndeliverableHere(false); + + source.setDefaultOutcome(modified); + + if (isNoLocal()) { + filters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL); + } + + if (getSelector() != null && !getSelector().trim().equals("")) { + filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector())); + } + + if (!filters.isEmpty()) { + source.setFilter(filters); + } + } + + @Override + public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + Delivery incoming = null; + do { + incoming = getEndpoint().current(); + if (incoming != null) { + if (incoming.isReadable() && !incoming.isPartial()) { + LOG.trace("{} has incoming Message(s).", this); + try { + processDelivery(incoming); + } + catch (Exception e) { + throw IOExceptionSupport.create(e); + } + getEndpoint().advance(); + } + else { + LOG.trace("{} has a partial incoming Message(s), deferring.", this); + incoming = null; + } + } + else { + // We have exhausted the locally queued messages on this link. + // Check if we tried to stop and have now run out of credit. + if (getEndpoint().getRemoteCredit() <= 0) { + if (stopRequest != null) { + stopRequest.onSuccess(); + stopRequest = null; + } + } + } + } while (incoming != null); + + super.processDeliveryUpdates(connection); + } + + private void processDelivery(Delivery incoming) throws Exception { + Message message = null; + try { + message = decodeIncomingMessage(incoming); + } + catch (Exception e) { + LOG.warn("Error on transform: {}", e.getMessage()); + deliveryFailed(incoming, true); + return; + } + + AmqpMessage amqpMessage = new AmqpMessage(this, message, incoming); + // Store reference to envelope in delivery context for recovery + incoming.setContext(amqpMessage); + prefetch.add(amqpMessage); + + // We processed a message, signal completion + // of a message pull request if there is one. + if (pullRequest != null) { + pullRequest.onSuccess(); + pullRequest = null; + } + } + + @Override + public void processFlowUpdates(AmqpConnection connection) throws IOException { + if (pullRequest != null || stopRequest != null) { + Receiver receiver = getEndpoint(); + if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) { + if (pullRequest != null) { + pullRequest.onSuccess(); + pullRequest = null; + } + + if (stopRequest != null) { + stopRequest.onSuccess(); + stopRequest = null; + } + } + } + + LOG.trace("Consumer {} flow updated, remote credit = {}", getSubscriptionName(), getEndpoint().getRemoteCredit()); + + super.processFlowUpdates(connection); + } + + protected Message decodeIncomingMessage(Delivery incoming) { + int count; + + byte[] chunk = new byte[2048]; + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + + while ((count = getEndpoint().recv(chunk, 0, chunk.length)) > 0) { + stream.write(chunk, 0, count); + } + + byte[] messageBytes = stream.toByteArray(); + + try { + Message protonMessage = Message.Factory.create(); + protonMessage.decode(messageBytes, 0, messageBytes.length); + return protonMessage; + } + finally { + try { + stream.close(); + } + catch (IOException e) { + } + } + } + + protected void deliveryFailed(Delivery incoming, boolean expandCredit) { + Modified disposition = new Modified(); + disposition.setUndeliverableHere(true); + disposition.setDeliveryFailed(true); + incoming.disposition(disposition); + incoming.settle(); + if (expandCredit) { + getEndpoint().flow(1); + } + } + + private void stop(final AsyncResult request) { + Receiver receiver = getEndpoint(); + if (receiver.getRemoteCredit() <= 0) { + if (receiver.getQueued() == 0) { + // We have no remote credit and all the deliveries have been processed. + request.onSuccess(); + } + else { + // There are still deliveries to process, wait for them to be. + stopRequest = request; + } + } + else { + // TODO: We don't actually want the additional messages that could be sent while + // draining. We could explicitly reduce credit first, or possibly use 'echo' instead + // of drain if it was supported. We would first need to understand what happens + // if we reduce credit below the number of messages already in-flight before + // the peer sees the update. + stopRequest = request; + receiver.drain(0); + + if (getDrainTimeout() > 0) { + // If the remote doesn't respond we will close the consumer and break any + // blocked receive or stop calls that are waiting. + final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() { + @Override + public void run() { + LOG.trace("Consumer {} drain request timed out", this); + Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time"); + locallyClosed(session.getConnection(), cause); + stopRequest.onFailure(cause); + session.pumpToProtonTransport(stopRequest); + } + }, getDrainTimeout(), TimeUnit.MILLISECONDS); + + stopRequest = new ScheduledRequest(future, stopRequest); + } + } + } + + private void stopOnSchedule(long timeout, final AsyncResult request) { + LOG.trace("Receiver {} scheduling stop", this); + // We need to drain the credit if no message(s) arrive to use it. + final ScheduledFuture<?> future = getSession().getScheduler().schedule(new Runnable() { + @Override + public void run() { + LOG.trace("Receiver {} running scheduled stop", this); + if (getEndpoint().getRemoteCredit() != 0) { + stop(request); + session.pumpToProtonTransport(request); + } + } + }, timeout, TimeUnit.MILLISECONDS); + + stopRequest = new ScheduledRequest(future, request); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{ address = " + address + "}"; + } + + private void checkClosed() { + if (isClosed()) { + throw new IllegalStateException("Receiver is already closed"); + } + } + + //----- Internal Transaction state callbacks -----------------------------// + + void preCommit() { + } + + void preRollback() { + } + + void postCommit() { + } + + void postRollback() { + } + + //----- Inner classes used in message pull operations --------------------// + + protected static final class ScheduledRequest implements AsyncResult { + + private final ScheduledFuture<?> sheduledTask; + private final AsyncResult origRequest; + + public ScheduledRequest(ScheduledFuture<?> completionTask, AsyncResult origRequest) { + this.sheduledTask = completionTask; + this.origRequest = origRequest; + } + + @Override + public void onFailure(Throwable cause) { + sheduledTask.cancel(false); + origRequest.onFailure(cause); + } + + @Override + public void onSuccess() { + boolean cancelled = sheduledTask.cancel(false); + if (cancelled) { + // Signal completion. Otherwise wait for the scheduled task to do it. + origRequest.onSuccess(); + } + } + + @Override + public boolean isComplete() { + return origRequest.isComplete(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java new file mode 100644 index 0000000..0c9bb81 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpRedirectedException.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.IOException; + +/** + * {@link IOException} derivative that defines that the remote peer has requested that this + * connection be redirected to some alternative peer. + */ +public class AmqpRedirectedException extends IOException { + + private static final long serialVersionUID = 5872211116061710369L; + + private final String hostname; + private final String networkHost; + private final int port; + + public AmqpRedirectedException(String reason, String hostname, String networkHost, int port) { + super(reason); + + this.hostname = hostname; + this.networkHost = networkHost; + this.port = port; + } + + /** + * @return the host name of the container being redirected to. + */ + public String getHostname() { + return hostname; + } + + /** + * @return the DNS host name or IP address of the peer this connection is being redirected to. + */ + public String getNetworkHost() { + return networkHost; + } + + /** + * @return the port number on the peer this connection is being redirected to. + */ + public int getPort() { + return port; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java new file mode 100644 index 0000000..bd66659 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpResource.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; + +/** + * AmqpResource specification. + * + * All AMQP types should implement this interface to allow for control of state + * and configuration details. + */ +public interface AmqpResource extends AmqpEventSink { + + /** + * Perform all the work needed to open this resource and store the request + * until such time as the remote peer indicates the resource has become active. + * + * @param request The initiating request that triggered this open call. + */ + void open(AsyncResult request); + + /** + * @return if the resource has moved to the opened state on the remote. + */ + boolean isOpen(); + + /** + * Called to indicate that this resource is now remotely opened. Once opened a + * resource can start accepting incoming requests. + */ + void opened(); + + /** + * Perform all work needed to close this resource and store the request + * until such time as the remote peer indicates the resource has been closed. + * + * @param request The initiating request that triggered this close call. + */ + void close(AsyncResult request); + + /** + * Perform all work needed to detach this resource and store the request + * until such time as the remote peer indicates the resource has been detached. + * + * @param request The initiating request that triggered this detach call. + */ + void detach(AsyncResult request); + + /** + * @return if the resource has moved to the closed state on the remote. + */ + boolean isClosed(); + + /** + * Called to indicate that this resource is now remotely closed. Once closed a + * resource can not accept any incoming requests. + */ + void closed(); + + /** + * Sets the failed state for this Resource and triggers a failure signal for + * any pending ProduverRequest. + */ + void failed(); + + /** + * Called to indicate that the remote end has become closed but the resource + * was not awaiting a close. This could happen during an open request where + * the remote does not set an error condition or during normal operation. + * + * @param connection The connection that owns this resource. + */ + void remotelyClosed(AmqpConnection connection); + + /** + * Called to indicate that the local end has become closed but the resource + * was not awaiting a close. This could happen during an open request where + * the remote does not set an error condition or during normal operation. + * + * @param connection The connection that owns this resource. + * @param error The error that triggered the local close of this resource. + */ + void locallyClosed(AmqpConnection connection, Exception error); + + /** + * Sets the failed state for this Resource and triggers a failure signal for + * any pending ProduverRequest. + * + * @param cause The Exception that triggered the failure. + */ + void failed(Exception cause); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java new file mode 100644 index 0000000..404b943 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -0,0 +1,452 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import javax.jms.InvalidDestinationException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Outcome; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sender class that manages a Proton sender endpoint. + */ +public class AmqpSender extends AmqpAbstractResource<Sender> { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class); + private static final byte[] EMPTY_BYTE_ARRAY = new byte[]{}; + + public static final long DEFAULT_SEND_TIMEOUT = 15000; + + private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true); + private final AtomicBoolean closed = new AtomicBoolean(); + + private final AmqpSession session; + private final String address; + private final String senderId; + private final Target userSpecifiedTarget; + + private boolean presettle; + private long sendTimeout = DEFAULT_SEND_TIMEOUT; + + private final Set<Delivery> pending = new LinkedHashSet<>(); + private byte[] encodeBuffer = new byte[1024 * 8]; + + /** + * Create a new sender instance. + * + * @param session The parent session that created the session. + * @param address The address that this sender produces to. + * @param senderId The unique ID assigned to this sender. + */ + public AmqpSender(AmqpSession session, String address, String senderId) { + + if (address != null && address.isEmpty()) { + throw new IllegalArgumentException("Address cannot be empty."); + } + + this.session = session; + this.address = address; + this.senderId = senderId; + this.userSpecifiedTarget = null; + } + + /** + * Create a new sender instance using the given Target when creating the link. + * + * @param session The parent session that created the session. + * @param address The address that this sender produces to. + * @param senderId The unique ID assigned to this sender. + */ + public AmqpSender(AmqpSession session, Target target, String senderId) { + + if (target == null) { + throw new IllegalArgumentException("User specified Target cannot be null"); + } + + this.session = session; + this.userSpecifiedTarget = target; + this.address = target.getAddress(); + this.senderId = senderId; + } + + /** + * Sends the given message to this senders assigned address. + * + * @param message the message to send. + * @throws IOException if an error occurs during the send. + */ + public void send(final AmqpMessage message) throws IOException { + checkClosed(); + final ClientFuture sendRequest = new ClientFuture(); + + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + try { + doSend(message, sendRequest); + session.pumpToProtonTransport(sendRequest); + } + catch (Exception e) { + sendRequest.onFailure(e); + session.getConnection().fireClientException(e); + } + } + }); + + if (sendTimeout <= 0) { + sendRequest.sync(); + } + else { + sendRequest.sync(sendTimeout, TimeUnit.MILLISECONDS); + } + } + + /** + * Close the sender, a closed sender will throw exceptions if any further send + * calls are made. + * + * @throws IOException if an error occurs while closing the sender. + */ + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + close(request); + session.pumpToProtonTransport(request); + } + }); + + request.sync(); + } + } + + /** + * @return this session's parent AmqpSession. + */ + public AmqpSession getSession() { + return session; + } + + /** + * @return an unmodifiable view of the underlying Sender instance. + */ + public Sender getSender() { + return new UnmodifiableSender(getEndpoint()); + } + + /** + * @return the assigned address of this sender. + */ + public String getAddress() { + return address; + } + + //----- Sender configuration ---------------------------------------------// + + /** + * @return will messages be settle on send. + */ + public boolean isPresettle() { + return presettle; + } + + /** + * Configure is sent messages are marked as settled on send, defaults to false. + * + * @param presettle configure if this sender will presettle all sent messages. + */ + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } + + /** + * @return the currently configured send timeout. + */ + public long getSendTimeout() { + return sendTimeout; + } + + /** + * Sets the amount of time the sender will block on a send before failing. + * + * @param sendTimeout time in milliseconds to wait. + */ + public void setSendTimeout(long sendTimeout) { + this.sendTimeout = sendTimeout; + } + + //----- Private Sender implementation ------------------------------------// + + private void checkClosed() { + if (isClosed()) { + throw new IllegalStateException("Sender is already closed"); + } + } + + @Override + protected void doOpen() { + + Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL}; + Source source = new Source(); + source.setAddress(senderId); + source.setOutcomes(outcomes); + + Target target = userSpecifiedTarget; + if (target == null) { + target = new Target(); + target.setAddress(address); + } + + String senderName = senderId + ":" + address; + + Sender sender = session.getEndpoint().sender(senderName); + sender.setSource(source); + sender.setTarget(target); + if (presettle) { + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + } + else { + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(sender); + + super.doOpen(); + } + + @Override + protected void doOpenCompletion() { + // Verify the attach response contained a non-null target + org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); + if (t != null) { + super.doOpenCompletion(); + } + else { + // No link terminus was created, the peer will now detach/close us. + } + } + + @Override + protected void doOpenInspection() { + try { + getStateInspector().inspectOpenedResource(getSender()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + @Override + protected void doClosedInspection() { + try { + getStateInspector().inspectClosedResource(getSender()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + @Override + protected void doDetachedInspection() { + try { + getStateInspector().inspectDetachedResource(getSender()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + @Override + protected Exception getOpenAbortException() { + // Verify the attach response contained a non-null target + org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); + if (t != null) { + return super.getOpenAbortException(); + } + else { + // No link terminus was created, the peer has detach/closed us, create IDE. + return new InvalidDestinationException("Link creation was refused"); + } + } + + private void doSend(AmqpMessage message, AsyncResult request) throws Exception { + LOG.trace("Producer sending message: {}", message); + + Delivery delivery = null; + if (presettle) { + delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0); + } + else { + byte[] tag = tagGenerator.getNextTag(); + delivery = getEndpoint().delivery(tag, 0, tag.length); + } + + delivery.setContext(request); + + if (session.isInTransaction()) { + Binary amqpTxId = session.getTransactionId().getRemoteTxId(); + TransactionalState state = new TransactionalState(); + state.setTxnId(amqpTxId); + delivery.disposition(state); + } + + encodeAndSend(message.getWrappedMessage(), delivery); + + if (presettle) { + delivery.settle(); + request.onSuccess(); + } + else { + pending.add(delivery); + getEndpoint().advance(); + } + } + + private void encodeAndSend(Message message, Delivery delivery) throws IOException { + + int encodedSize; + while (true) { + try { + encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length); + break; + } + catch (java.nio.BufferOverflowException e) { + encodeBuffer = new byte[encodeBuffer.length * 2]; + } + } + + int sentSoFar = 0; + + while (true) { + int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar); + if (sent > 0) { + sentSoFar += sent; + if ((encodedSize - sentSoFar) == 0) { + break; + } + } + else { + LOG.warn("{} failed to send any data from current Message.", this); + } + } + } + + @Override + public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + List<Delivery> toRemove = new ArrayList<>(); + + for (Delivery delivery : pending) { + DeliveryState state = delivery.getRemoteState(); + if (state == null) { + continue; + } + + Outcome outcome = null; + if (state instanceof TransactionalState) { + LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state); + outcome = ((TransactionalState) state).getOutcome(); + } + else if (state instanceof Outcome) { + outcome = (Outcome) state; + } + else { + LOG.warn("Message send updated with unsupported state: {}", state); + outcome = null; + } + + AsyncResult request = (AsyncResult) delivery.getContext(); + Exception deliveryError = null; + + if (outcome instanceof Accepted) { + LOG.trace("Outcome of delivery was accepted: {}", delivery); + if (request != null && !request.isComplete()) { + request.onSuccess(); + } + } + else if (outcome instanceof Rejected) { + LOG.trace("Outcome of delivery was rejected: {}", delivery); + ErrorCondition remoteError = ((Rejected) outcome).getError(); + if (remoteError == null) { + remoteError = getEndpoint().getRemoteCondition(); + } + + deliveryError = AmqpSupport.convertToException(remoteError); + } + else if (outcome instanceof Released) { + LOG.trace("Outcome of delivery was released: {}", delivery); + deliveryError = new IOException("Delivery failed: released by receiver"); + } + else if (outcome instanceof Modified) { + LOG.trace("Outcome of delivery was modified: {}", delivery); + deliveryError = new IOException("Delivery failed: failure at remote"); + } + + if (deliveryError != null) { + if (request != null && !request.isComplete()) { + request.onFailure(deliveryError); + } + else { + connection.fireClientException(deliveryError); + } + } + + tagGenerator.returnTag(delivery.getTag()); + delivery.settle(); + toRemove.add(delivery); + } + + pending.removeAll(toRemove); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{ address = " + address + "}"; + } +}
