http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java new file mode 100644 index 0000000..ec37710 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -0,0 +1,599 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import static org.apache.activemq.transport.amqp.AmqpSupport.COPY; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.InvalidDestinationException; + +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Receiver class that manages a Proton receiver endpoint. + */ +public class AmqpReceiver extends AmqpAbstractResource<Receiver> { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class); + + // TODO: Use constants available from Proton 0.9 + private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:accepted:list"); + private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:rejected:list"); + private static final Symbol MODIFIED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:modified:list"); + private static final Symbol RELEASED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:released:list"); + + private final AtomicBoolean closed = new AtomicBoolean(); + + private final BlockingQueue<AmqpMessage> prefetch = new LinkedBlockingDeque<AmqpMessage>(); + + private final AmqpSession session; + private final String address; + private final String receiverId; + + private String subscriptionName; + private String selector; + private boolean presettle; + private boolean noLocal; + + /** + * Create a new receiver instance. + * + * @param session + * The parent session that created the receiver. + * @param address + * The address that this receiver should listen on. + * @param receiverId + * The unique ID assigned to this receiver. + */ + public AmqpReceiver(AmqpSession session, String address, String receiverId) { + this.session = session; + this.address = address; + this.receiverId = receiverId; + } + + /** + * Close the sender, a closed sender will throw exceptions if any further send + * calls are made. + * + * @throws IOException if an error occurs while closing the sender. + */ + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + close(request); + session.pumpToProtonTransport(); + } + }); + + request.sync(); + } + } + + /** + * @return this session's parent AmqpSession. + */ + public AmqpSession getSession() { + return session; + } + + /** + * @return the address that this receiver has been configured to listen on. + */ + public String getAddress() { + return address; + } + + /** + * Attempts to wait on a message to be delivered to this receiver. The receive + * call will wait indefinitely for a message to be delivered. + * + * @return a newly received message sent to this receiver. + * + * @throws Exception if an error occurs during the receive attempt. + */ + public AmqpMessage receive() throws Exception { + checkClosed(); + return prefetch.take(); + } + + /** + * Attempts to receive a message sent to this receiver, waiting for the given + * timeout value before giving up and returning null. + * + * @param timeout + * the time to wait for a new message to arrive. + * @param unit + * the unit of time that the timeout value represents. + * + * @return a newly received message or null if the time to wait period expires. + * + * @throws Exception if an error occurs during the receive attempt. + */ + public AmqpMessage receive(long timeout, TimeUnit unit) throws Exception { + checkClosed(); + return prefetch.poll(timeout, unit); + } + + /** + * If a message is already available in this receiver's prefetch buffer then + * it is returned immediately otherwise this methods return null without waiting. + * + * @return a newly received message or null if there is no currently available message. + * + * @throws Exception if an error occurs during the receive attempt. + */ + public AmqpMessage receiveNoWait() throws Exception { + checkClosed(); + return prefetch.poll(); + } + + /** + * Controls the amount of credit given to the receiver link. + * + * @param credit + * the amount of credit to grant. + * + * @throws IOException if an error occurs while sending the flow. + */ + public void flow(final int credit) throws IOException { + checkClosed(); + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + getEndpoint().flow(credit); + session.pumpToProtonTransport(); + request.onSuccess(); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Attempts to drain a given amount of credit from the link. + * + * @param credit + * the amount of credit to drain. + * + * @throws IOException if an error occurs while sending the drain. + */ + public void drain(final int credit) throws IOException { + checkClosed(); + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + getEndpoint().drain(credit); + session.pumpToProtonTransport(); + request.onSuccess(); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Accepts a message that was dispatched under the given Delivery instance. + * + * @param delivery + * the Delivery instance to accept. + * + * @throws IOException if an error occurs while sending the accept. + */ + public void accept(final Delivery delivery) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to accept cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + delivery.disposition(Accepted.getInstance()); + delivery.settle(); + session.pumpToProtonTransport(); + } + request.onSuccess(); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Reject a message that was dispatched under the given Delivery instance. + * + * @param delivery + * the Delivery instance to reject. + * @param undeliverableHere + * marks the delivery as not being able to be process by link it was sent to. + * @param deliveryFailed + * indicates that the delivery failed for some reason. + * + * @throws IOException if an error occurs while sending the reject. + */ + public void reject(final Delivery delivery, final boolean undeliverableHere, final boolean deliveryFailed) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to reject cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + Modified disposition = new Modified(); + disposition.setUndeliverableHere(undeliverableHere); + disposition.setDeliveryFailed(deliveryFailed); + delivery.disposition(disposition); + delivery.settle(); + session.pumpToProtonTransport(); + } + request.onSuccess(); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Release a message that was dispatched under the given Delivery instance. + * + * @param delivery + * the Delivery instance to release. + * + * @throws IOException if an error occurs while sending the release. + */ + public void release(final Delivery delivery) throws IOException { + checkClosed(); + + if (delivery == null) { + throw new IllegalArgumentException("Delivery to release cannot be null"); + } + + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + if (!delivery.isSettled()) { + delivery.disposition(Released.getInstance()); + delivery.settle(); + session.pumpToProtonTransport(); + } + request.onSuccess(); + } catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * @return an unmodifiable view of the underlying Receiver instance. + */ + public Receiver getReceiver() { + return new UnmodifiableReceiver(getEndpoint()); + } + + //----- Receiver configuration properties --------------------------------// + + public boolean isPresettle() { + return presettle; + } + + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } + + public boolean isDurable() { + return subscriptionName != null; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public String getSelector() { + return selector; + } + + public void setSelector(String selector) { + this.selector = selector; + } + + public boolean isNoLocal() { + return noLocal; + } + + public void setNoLocal(boolean noLocal) { + this.noLocal = noLocal; + } + + //----- Internal implementation ------------------------------------------// + + @Override + protected void doOpen() { + + Source source = new Source(); + source.setAddress(address); + Target target = new Target(); + + configureSource(source); + + String receiverName = receiverId + ":" + address; + + if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) { + // In the case of Durable Topic Subscriptions the client must use the same + // receiver name which is derived from the subscription name property. + receiverName = getSubscriptionName(); + } + + Receiver receiver = session.getEndpoint().receiver(receiverName); + receiver.setSource(source); + receiver.setTarget(target); + if (isPresettle()) { + receiver.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(receiver); + + super.doOpen(); + } + + @Override + protected void doOpenCompletion() { + // Verify the attach response contained a non-null Source + org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource(); + if (s != null) { + super.doOpenCompletion(); + } else { + // No link terminus was created, the peer will now detach/close us. + } + } + + @Override + protected void doClose() { + if (isDurable()) { + getEndpoint().detach(); + } else { + getEndpoint().close(); + } + } + + @Override + protected Exception getOpenAbortException() { + // Verify the attach response contained a non-null Source + org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource(); + if (s != null) { + return super.getOpenAbortException(); + } else { + // No link terminus was created, the peer has detach/closed us, create IDE. + return new InvalidDestinationException("Link creation was refused"); + } + } + + @Override + protected void doOpenInspection() { + getStateInspector().inspectOpenedResource(getReceiver()); + } + + @Override + protected void doClosedInspection() { + getStateInspector().inspectClosedResource(getReceiver()); + } + + @Override + protected void doDetachedInspection() { + getStateInspector().inspectDetachedResource(getReceiver()); + } + + protected void configureSource(Source source) { + Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>(); + Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, REJECTED_DESCRIPTOR_SYMBOL, + RELEASED_DESCRIPTOR_SYMBOL, MODIFIED_DESCRIPTOR_SYMBOL}; + + if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) { + source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + source.setDurable(TerminusDurability.UNSETTLED_STATE); + source.setDistributionMode(COPY); + } else { + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + } + + source.setOutcomes(outcomes); + + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + modified.setUndeliverableHere(false); + + source.setDefaultOutcome(modified); + + if (isNoLocal()) { + filters.put(NO_LOCAL_NAME, AmqpNoLocalType.NO_LOCAL); + } + + if (getSelector() != null && !getSelector().trim().equals("")) { + filters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorType(getSelector())); + } + + if (!filters.isEmpty()) { + source.setFilter(filters); + } + } + + @Override + public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + Delivery incoming = null; + do { + incoming = getEndpoint().current(); + if (incoming != null) { + if(incoming.isReadable() && !incoming.isPartial()) { + LOG.trace("{} has incoming Message(s).", this); + try { + processDelivery(incoming); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } + getEndpoint().advance(); + } else { + LOG.trace("{} has a partial incoming Message(s), deferring.", this); + incoming = null; + } + } + } while (incoming != null); + + super.processDeliveryUpdates(connection); + } + + private void processDelivery(Delivery incoming) throws Exception { + Message message = null; + try { + message = decodeIncomingMessage(incoming); + } catch (Exception e) { + LOG.warn("Error on transform: {}", e.getMessage()); + deliveryFailed(incoming, true); + return; + } + + AmqpMessage amqpMessage = new AmqpMessage(this, message, incoming); + // Store reference to envelope in delivery context for recovery + incoming.setContext(amqpMessage); + prefetch.add(amqpMessage); + } + + protected Message decodeIncomingMessage(Delivery incoming) { + int count; + + byte[] chunk = new byte[2048]; + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + + while ((count = getEndpoint().recv(chunk, 0, chunk.length)) > 0) { + stream.write(chunk, 0, count); + } + + byte[] messageBytes = stream.toByteArray(); + + try { + Message protonMessage = Message.Factory.create(); + protonMessage.decode(messageBytes, 0, messageBytes.length); + return protonMessage; + } finally { + try { + stream.close(); + } catch (IOException e) { + } + } + } + + protected void deliveryFailed(Delivery incoming, boolean expandCredit) { + Modified disposition = new Modified(); + disposition.setUndeliverableHere(true); + disposition.setDeliveryFailed(true); + incoming.disposition(disposition); + incoming.settle(); + if (expandCredit) { + getEndpoint().flow(1); + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{ address = " + address + "}"; + } + + private void checkClosed() { + if (isClosed()) { + throw new IllegalStateException("Receiver is already closed"); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java new file mode 100644 index 0000000..b4e6215 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.IOException; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; + +/** + * AmqpResource specification. + * + * All AMQP types should implement this interface to allow for control of state + * and configuration details. + */ +public interface AmqpResource { + + /** + * Perform all the work needed to open this resource and store the request + * until such time as the remote peer indicates the resource has become active. + * + * @param request + * The initiating request that triggered this open call. + */ + void open(AsyncResult request); + + /** + * @return if the resource has moved to the opened state on the remote. + */ + boolean isOpen(); + + /** + * Called to indicate that this resource is now remotely opened. Once opened a + * resource can start accepting incoming requests. + */ + void opened(); + + /** + * Perform all work needed to close this resource and store the request + * until such time as the remote peer indicates the resource has been closed. + * + * @param request + * The initiating request that triggered this close call. + */ + void close(AsyncResult request); + + /** + * @return if the resource has moved to the closed state on the remote. + */ + boolean isClosed(); + + /** + * Called to indicate that this resource is now remotely closed. Once closed a + * resource can not accept any incoming requests. + */ + void closed(); + + /** + * Sets the failed state for this Resource and triggers a failure signal for + * any pending ProduverRequest. + */ + void failed(); + + /** + * Called to indicate that the remote end has become closed but the resource + * was not awaiting a close. This could happen during an open request where + * the remote does not set an error condition or during normal operation. + * + * @param connection + * The connection that owns this resource. + */ + void remotelyClosed(AmqpConnection connection); + + /** + * Sets the failed state for this Resource and triggers a failure signal for + * any pending ProduverRequest. + * + * @param cause + * The Exception that triggered the failure. + */ + void failed(Exception cause); + + /** + * Event handler for remote peer open of this resource. + * + * @param connection + * The connection that owns this resource. + * + * @throws IOException if an error occurs while processing the update. + */ + void processRemoteOpen(AmqpConnection connection) throws IOException; + + /** + * Event handler for remote peer detach of this resource. + * + * @param connection + * The connection that owns this resource. + * + * @throws IOException if an error occurs while processing the update. + */ + void processRemoteDetach(AmqpConnection connection) throws IOException; + + /** + * Event handler for remote peer close of this resource. + * + * @param connection + * The connection that owns this resource. + * + * @throws IOException if an error occurs while processing the update. + */ + void processRemoteClose(AmqpConnection connection) throws IOException; + + /** + * Called when the Proton Engine signals an Delivery related event has been triggered + * for the given endpoint. + * + * @param connection + * The connection that owns this resource. + * + * @throws IOException if an error occurs while processing the update. + */ + void processDeliveryUpdates(AmqpConnection connection) throws IOException; + + /** + * Called when the Proton Engine signals an Flow related event has been triggered + * for the given endpoint. + * + * @param connection + * The connection that owns this resource. + * + * @throws IOException if an error occurs while processing the update. + */ + void processFlowUpdates(AmqpConnection connection) throws IOException; + + /** + * @returns true if the remote end has sent an error + */ + boolean hasRemoteError(); + + /** + * @return an Exception derived from the error state of the endpoint's Remote Condition. + */ + Exception getRemoteError(); + + /** + * @return an Error message derived from the error state of the endpoint's Remote Condition. + */ + String getRemoteErrorMessage(); + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java new file mode 100644 index 0000000..95b0743 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.InvalidDestinationException; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Outcome; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sender class that manages a Proton sender endpoint. + */ +public class AmqpSender extends AmqpAbstractResource<Sender> { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class); + private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; + //TODO: Use constants available from Proton 0.9 + private static final Symbol ACCEPTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:accepted:list"); + private static final Symbol REJECTED_DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:rejected:list"); + + public static final long DEFAULT_SEND_TIMEOUT = 15000; + + private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true); + private final AtomicBoolean closed = new AtomicBoolean(); + + private final AmqpSession session; + private final String address; + private final String senderId; + + private boolean presettle; + private long sendTimeout = DEFAULT_SEND_TIMEOUT; + + private final Set<Delivery> pending = new LinkedHashSet<Delivery>(); + private byte[] encodeBuffer = new byte[1024 * 8]; + + /** + * Create a new sender instance. + * + * @param session + * The parent session that created the session. + * @param address + * The address that this sender produces to. + * @param senderId + * The unique ID assigned to this sender. + */ + public AmqpSender(AmqpSession session, String address, String senderId) { + this.session = session; + this.address = address; + this.senderId = senderId; + } + + /** + * Sends the given message to this senders assigned address. + * + * @param message + * the message to send. + * + * @throws IOException if an error occurs during the send. + */ + public void send(final AmqpMessage message) throws IOException { + checkClosed(); + final ClientFuture sendRequest = new ClientFuture(); + + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + try { + doSend(message, sendRequest); + session.pumpToProtonTransport(); + } catch (Exception e) { + sendRequest.onFailure(e); + session.getConnection().fireClientException(e); + } + } + }); + + if (sendTimeout <= 0) { + sendRequest.sync(); + } else { + sendRequest.sync(sendTimeout, TimeUnit.MILLISECONDS); + } + } + + /** + * Close the sender, a closed sender will throw exceptions if any further send + * calls are made. + * + * @throws IOException if an error occurs while closing the sender. + */ + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + close(request); + session.pumpToProtonTransport(); + } + }); + + request.sync(); + } + } + + /** + * @return this session's parent AmqpSession. + */ + public AmqpSession getSession() { + return session; + } + + /** + * @return an unmodifiable view of the underlying Sender instance. + */ + public Sender getSender() { + return new UnmodifiableSender(getEndpoint()); + } + + /** + * @return the assigned address of this sender. + */ + public String getAddress() { + return address; + } + + //----- Sender configuration ---------------------------------------------// + + /** + * @return will messages be settle on send. + */ + public boolean isPresettle() { + return presettle; + } + + /** + * Configure is sent messages are marked as settled on send, defaults to false. + * + * @param presettle + * configure if this sender will presettle all sent messages. + */ + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } + + /** + * @return the currently configured send timeout. + */ + public long getSendTimeout() { + return sendTimeout; + } + + /** + * Sets the amount of time the sender will block on a send before failing. + * + * @param sendTimeout + * time in milliseconds to wait. + */ + public void setSendTimeout(long sendTimeout) { + this.sendTimeout = sendTimeout; + } + + //----- Private Sender implementation ------------------------------------// + + private void checkClosed() { + if (isClosed()) { + throw new IllegalStateException("Sender is already closed"); + } + } + + @Override + protected void doOpen() { + + Symbol[] outcomes = new Symbol[]{ACCEPTED_DESCRIPTOR_SYMBOL, REJECTED_DESCRIPTOR_SYMBOL}; + Source source = new Source(); + source.setAddress(senderId); + source.setOutcomes(outcomes); + + Target target = new Target(); + target.setAddress(address); + + String senderName = senderId + ":" + address; + + Sender sender = session.getEndpoint().sender(senderName); + sender.setSource(source); + sender.setTarget(target); + if (presettle) { + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(sender); + + super.doOpen(); + } + + @Override + protected void doOpenCompletion() { + // Verify the attach response contained a non-null target + org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); + if (t != null) { + super.doOpenCompletion(); + } else { + // No link terminus was created, the peer will now detach/close us. + } + } + + @Override + protected void doOpenInspection() { + getStateInspector().inspectOpenedResource(getSender()); + } + + @Override + protected void doClosedInspection() { + getStateInspector().inspectClosedResource(getSender()); + } + + @Override + protected void doDetachedInspection() { + getStateInspector().inspectDetachedResource(getSender()); + } + + @Override + protected Exception getOpenAbortException() { + // Verify the attach response contained a non-null target + org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); + if (t != null) { + return super.getOpenAbortException(); + } else { + // No link terminus was created, the peer has detach/closed us, create IDE. + return new InvalidDestinationException("Link creation was refused"); + } + } + + private void doSend(AmqpMessage message, AsyncResult request) throws Exception { + + LOG.trace("Producer sending message: {}", message); + + byte[] tag = tagGenerator.getNextTag(); + Delivery delivery = null; + + if (presettle) { + delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0); + } else { + delivery = getEndpoint().delivery(tag, 0, tag.length); + } + + delivery.setContext(request); + + encodeAndSend(message.getWrappedMessage(), delivery); + + if (presettle) { + delivery.settle(); + request.onSuccess(); + } else { + pending.add(delivery); + getEndpoint().advance(); + } + } + + private void encodeAndSend(Message message, Delivery delivery) throws IOException { + + int encodedSize; + while (true) { + try { + encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length); + break; + } catch (java.nio.BufferOverflowException e) { + encodeBuffer = new byte[encodeBuffer.length * 2]; + } + } + + int sentSoFar = 0; + + while (true) { + int sent = getEndpoint().send(encodeBuffer, sentSoFar, encodedSize - sentSoFar); + if (sent > 0) { + sentSoFar += sent; + if ((encodedSize - sentSoFar) == 0) { + break; + } + } else { + LOG.warn("{} failed to send any data from current Message.", this); + } + } + } + + @Override + public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + List<Delivery> toRemove = new ArrayList<Delivery>(); + + for (Delivery delivery : pending) { + DeliveryState state = delivery.getRemoteState(); + if (state == null) { + continue; + } + + Outcome outcome = null; + if (state instanceof TransactionalState) { + LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state); + outcome = ((TransactionalState) state).getOutcome(); + } else if (state instanceof Outcome) { + outcome = (Outcome) state; + } else { + LOG.warn("Message send updated with unsupported state: {}", state); + continue; + } + + AsyncResult request = (AsyncResult) delivery.getContext(); + + if (outcome instanceof Accepted) { + toRemove.add(delivery); + LOG.trace("Outcome of delivery was accepted: {}", delivery); + tagGenerator.returnTag(delivery.getTag()); + if (request != null && !request.isComplete()) { + request.onSuccess(); + } + } else if (outcome instanceof Rejected) { + Exception remoteError = getRemoteError(); + toRemove.add(delivery); + LOG.trace("Outcome of delivery was rejected: {}", delivery); + tagGenerator.returnTag(delivery.getTag()); + if (request != null && !request.isComplete()) { + request.onFailure(remoteError); + } else { + connection.fireClientException(getRemoteError()); + } + } else { + LOG.warn("Message send updated with unsupported outcome: {}", outcome); + } + } + + pending.removeAll(toRemove); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{ address = " + address + "}"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java new file mode 100644 index 0000000..9368b26 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Session; + +/** + * Session class that manages a Proton session endpoint. + */ +public class AmqpSession extends AmqpAbstractResource<Session> { + + private final AtomicLong receiverIdGenerator = new AtomicLong(); + private final AtomicLong senderIdGenerator = new AtomicLong(); + + private final AmqpConnection connection; + private final String sessionId; + + /** + * Create a new session instance. + * + * @param connection + * The parent connection that created the session. + * @param sessionId + * The unique ID value assigned to this session. + */ + public AmqpSession(AmqpConnection connection, String sessionId) { + this.connection = connection; + this.sessionId = sessionId; + } + + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address) throws Exception { + checkClosed(); + + final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId()); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + sender.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return sender; + } + + /** + * Create a receiver instance using the given address + * + * @param address + * the address to which the receiver will subscribe for its messages. + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address) throws Exception { + checkClosed(); + + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId()); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return receiver; + } + + /** + * @return this session's parent AmqpConnection. + */ + public AmqpConnection getConnection() { + return connection; + } + + public Session getSession() { + return new UnmodifiableSession(getEndpoint()); + } + + //----- Internal getters used from the child AmqpResource classes --------// + + ScheduledExecutorService getScheduler() { + return connection.getScheduler(); + } + + Connection getProtonConnection() { + return connection.getProtonConnection(); + } + + void pumpToProtonTransport() { + connection.pumpToProtonTransport(); + } + + //----- Private implementation details -----------------------------------// + + @Override + protected void doOpenInspection() { + getStateInspector().inspectOpenedResource(getSession()); + } + + @Override + protected void doClosedInspection() { + getStateInspector().inspectClosedResource(getSession()); + } + + private String getNextSenderId() { + return sessionId + ":" + senderIdGenerator.incrementAndGet(); + } + + private String getNextReceiverId() { + return sessionId + ":" + receiverIdGenerator.incrementAndGet(); + } + + private void checkClosed() { + if (isClosed()) { + throw new IllegalStateException("Session is already closed"); + } + } + + @Override + public String toString() { + return "AmqpSession { " + sessionId + " }"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java new file mode 100644 index 0000000..5471876 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpStateInspector.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; + +/** + * Abstract base for a validation hook that is used in tests to check + * the state of a remote resource after a variety of lifecycle events. + */ +public class AmqpStateInspector { + + private boolean valid = true; + private String errorMessage; + + public void inspectOpenedResource(Connection connection) { + + } + + public void inspectOpenedResource(Session session) { + + } + + public void inspectOpenedResource(Link link) { + + } + + public void inspectClosedResource(Connection remoteConnection) { + + } + + public void inspectClosedResource(Session session) { + + } + + public void inspectClosedResource(Link link) { + + } + + public void inspectDetachedResource(Link link) { + + } + + public boolean isValid() { + return valid; + } + + protected void setValid(boolean valid) { + this.valid = valid; + } + + public String getErrorMessage() { + return errorMessage; + } + + protected void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + protected void markAsInvalid(String errorMessage) { + if (valid) { + setValid(false); + setErrorMessage(errorMessage); + } + } + + public void assertIfStateChecksFailed() { + if (!isValid()) { + throw new AssertionError(errorMessage); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java new file mode 100644 index 0000000..08db018 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.UnsupportedEncodingException; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * Utility class that can generate and if enabled pool the binary tag values + * used to identify transfers over an AMQP link. + */ +public final class AmqpTransferTagGenerator { + + public static final int DEFAULT_TAG_POOL_SIZE = 1024; + + private long nextTagId; + private int maxPoolSize = DEFAULT_TAG_POOL_SIZE; + + private final Set<byte[]> tagPool; + + public AmqpTransferTagGenerator() { + this(false); + } + + public AmqpTransferTagGenerator(boolean pool) { + if (pool) { + this.tagPool = new LinkedHashSet<byte[]>(); + } else { + this.tagPool = null; + } + } + + /** + * Retrieves the next available tag. + * + * @return a new or unused tag depending on the pool option. + */ + public byte[] getNextTag() { + byte[] rc; + if (tagPool != null && !tagPool.isEmpty()) { + final Iterator<byte[]> iterator = tagPool.iterator(); + rc = iterator.next(); + iterator.remove(); + } else { + try { + rc = Long.toHexString(nextTagId++).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + // This should never happen since we control the input. + throw new RuntimeException(e); + } + } + return rc; + } + + /** + * When used as a pooled cache of tags the unused tags should always be returned once + * the transfer has been settled. + * + * @param data + * a previously borrowed tag that is no longer in use. + */ + public void returnTag(byte[] data) { + if (tagPool != null && tagPool.size() < maxPoolSize) { + tagPool.add(data); + } + } + + /** + * Gets the current max pool size value. + * + * @return the current max tag pool size. + */ + public int getMaxPoolSize() { + return maxPoolSize; + } + + /** + * Sets the max tag pool size. If the size is smaller than the current number + * of pooled tags the pool will drain over time until it matches the max. + * + * @param maxPoolSize + * the maximum number of tags to hold in the pool. + */ + public void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java new file mode 100644 index 0000000..953038a --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +import java.util.HashMap; +import java.util.Map; + +/** + * Base class for SASL Authentication Mechanism that implements the basic + * methods of a Mechanism class. + */ +public abstract class AbstractMechanism implements Mechanism { + + protected static final byte[] EMPTY = new byte[0]; + + private String username; + private String password; + private Map<String, Object> properties = new HashMap<String, Object>(); + + @Override + public int compareTo(Mechanism other) { + + if (getPriority() < other.getPriority()) { + return -1; + } else if (getPriority() > other.getPriority()) { + return 1; + } + + return 0; + } + + @Override + public void setUsername(String value) { + this.username = value; + } + + @Override + public String getUsername() { + return username; + } + + @Override + public void setPassword(String value) { + this.password = value; + } + + @Override + public String getPassword() { + return this.password; + } + + @Override + public void setProperties(Map<String, Object> properties) { + this.properties = properties; + } + + @Override + public Map<String, Object> getProperties() { + return this.properties; + } + + @Override + public String toString() { + return "SASL-" + getName(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java new file mode 100644 index 0000000..8ac8b61 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +/** + * Implements the Anonymous SASL authentication mechanism. + */ +public class AnonymousMechanism extends AbstractMechanism { + + @Override + public byte[] getInitialResponse() { + return EMPTY; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) { + return EMPTY; + } + + @Override + public int getPriority() { + return PRIORITY.LOWEST.getValue(); + } + + @Override + public String getName() { + return "ANONYMOUS"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java new file mode 100644 index 0000000..638fb2e --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +import java.io.UnsupportedEncodingException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.security.sasl.SaslException; + +/** + * Implements the SASL PLAIN authentication Mechanism. + * + * User name and Password values are sent without being encrypted. + */ +public class CramMD5Mechanism extends AbstractMechanism { + + private static final String ASCII = "ASCII"; + private static final String HMACMD5 = "HMACMD5"; + private boolean sentResponse; + + @Override + public int getPriority() { + return PRIORITY.HIGH.getValue(); + } + + @Override + public String getName() { + return "CRAM-MD5"; + } + + @Override + public byte[] getInitialResponse() { + return EMPTY; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) throws SaslException { + if (!sentResponse && challenge != null && challenge.length != 0) { + try { + SecretKeySpec key = new SecretKeySpec(getPassword().getBytes(ASCII), HMACMD5); + Mac mac = Mac.getInstance(HMACMD5); + mac.init(key); + + byte[] bytes = mac.doFinal(challenge); + + StringBuffer hash = new StringBuffer(getUsername()); + hash.append(' '); + for (int i = 0; i < bytes.length; i++) { + String hex = Integer.toHexString(0xFF & bytes[i]); + if (hex.length() == 1) { + hash.append('0'); + } + hash.append(hex); + } + + sentResponse = true; + return hash.toString().getBytes(ASCII); + } catch (UnsupportedEncodingException e) { + throw new SaslException("Unable to utilise required encoding", e); + } catch (InvalidKeyException e) { + throw new SaslException("Unable to utilise key", e); + } catch (NoSuchAlgorithmException e) { + throw new SaslException("Unable to utilise required algorithm", e); + } + } else { + return EMPTY; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java new file mode 100644 index 0000000..e1296f9 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +import java.util.Map; + +import javax.security.sasl.SaslException; + +/** + * Interface for all SASL authentication mechanism implementations. + */ +public interface Mechanism extends Comparable<Mechanism> { + + /** + * Relative priority values used to arrange the found SASL + * mechanisms in a preferred order where the level of security + * generally defines the preference. + */ + public enum PRIORITY { + LOWEST(0), + LOW(1), + MEDIUM(2), + HIGH(3), + HIGHEST(4); + + private final int value; + + private PRIORITY(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + }; + + /** + * @return return the relative priority of this SASL mechanism. + */ + int getPriority(); + + /** + * @return the well known name of this SASL mechanism. + */ + String getName(); + + /** + * @return the response buffer used to answer the initial SASL cycle. + * @throws SaslException if an error occurs computing the response. + */ + byte[] getInitialResponse() throws SaslException; + + /** + * Create a response based on a given challenge from the remote peer. + * + * @param challenge + * the challenge that this Mechanism should response to. + * + * @return the response that answers the given challenge. + * @throws SaslException if an error occurs computing the response. + */ + byte[] getChallengeResponse(byte[] challenge) throws SaslException; + + /** + * Sets the user name value for this Mechanism. The Mechanism can ignore this + * value if it does not utilize user name in it's authentication processing. + * + * @param username + * The user name given. + */ + void setUsername(String value); + + /** + * Returns the configured user name value for this Mechanism. + * + * @return the currently set user name value for this Mechanism. + */ + String getUsername(); + + /** + * Sets the password value for this Mechanism. The Mechanism can ignore this + * value if it does not utilize a password in it's authentication processing. + * + * @param username + * The user name given. + */ + void setPassword(String value); + + /** + * Returns the configured password value for this Mechanism. + * + * @return the currently set password value for this Mechanism. + */ + String getPassword(); + + /** + * Sets any additional Mechanism specific properties using a Map<String, Object> + * + * @param options + * the map of additional properties that this Mechanism should utilize. + */ + void setProperties(Map<String, Object> options); + + /** + * The currently set Properties for this Mechanism. + * + * @return the current set of configuration Properties for this Mechanism. + */ + Map<String, Object> getProperties(); + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java new file mode 100644 index 0000000..ce26124 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +/** + * Implements the SASL PLAIN authentication Mechanism. + * + * User name and Password values are sent without being encrypted. + */ +public class PlainMechanism extends AbstractMechanism { + + @Override + public int getPriority() { + return PRIORITY.MEDIUM.getValue(); + } + + @Override + public String getName() { + return "PLAIN"; + } + + @Override + public byte[] getInitialResponse() { + + String username = getUsername(); + String password = getPassword(); + + if (username == null) { + username = ""; + } + + if (password == null) { + password = ""; + } + + byte[] usernameBytes = username.getBytes(); + byte[] passwordBytes = password.getBytes(); + byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2]; + System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length); + System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length); + return data; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) { + return EMPTY; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java new file mode 100644 index 0000000..818ddff --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.jms.JMSSecurityException; +import javax.security.sasl.SaslException; + +import org.apache.qpid.proton.engine.Sasl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manage the SASL authentication process + */ +public class SaslAuthenticator { + + private static final Logger LOG = LoggerFactory.getLogger(SaslAuthenticator.class); + + private final Sasl sasl; + private final String username; + private final String password; + private Mechanism mechanism; + + /** + * Create the authenticator and initialize it. + * + * @param sasl + * The Proton SASL entry point this class will use to manage the authentication. + * @param username + * The user name that will be used to authenticate. + * @param password + * The password that will be used to authenticate. + */ + public SaslAuthenticator(Sasl sasl, String username, String password) { + this.sasl = sasl; + this.username = username; + this.password = password; + } + + /** + * Process the SASL authentication cycle until such time as an outcome is determine. This + * method must be called by the managing entity until the return value is true indicating a + * successful authentication or a JMSSecurityException is thrown indicating that the + * handshake failed. + * + * @throws JMSSecurityException + */ + public boolean authenticate() throws SecurityException { + switch (sasl.getState()) { + case PN_SASL_IDLE: + handleSaslInit(); + break; + case PN_SASL_STEP: + handleSaslStep(); + break; + case PN_SASL_FAIL: + handleSaslFail(); + break; + case PN_SASL_PASS: + return true; + default: + } + + return false; + } + + private void handleSaslInit() throws SecurityException { + try { + String[] remoteMechanisms = sasl.getRemoteMechanisms(); + if (remoteMechanisms != null && remoteMechanisms.length != 0) { + mechanism = findMatchingMechanism(remoteMechanisms); + if (mechanism != null) { + mechanism.setUsername(username); + mechanism.setPassword(password); + // TODO - set additional options from URI. + // TODO - set a host value. + + sasl.setMechanisms(mechanism.getName()); + byte[] response = mechanism.getInitialResponse(); + if (response != null && response.length != 0) { + sasl.send(response, 0, response.length); + } + } else { + // TODO - Better error message. + throw new SecurityException("Could not find a matching SASL mechanism for the remote peer."); + } + } + } catch (SaslException se) { + // TODO - Better error message. + SecurityException jmsse = new SecurityException("Exception while processing SASL init."); + jmsse.initCause(se); + throw jmsse; + } + } + + private Mechanism findMatchingMechanism(String...remoteMechanisms) { + + Mechanism match = null; + List<Mechanism> found = new ArrayList<Mechanism>(); + + for (String remoteMechanism : remoteMechanisms) { + if (remoteMechanism.equalsIgnoreCase("PLAIN")) { + found.add(new PlainMechanism()); + } else if (remoteMechanism.equalsIgnoreCase("ANONYMOUS")) { + found.add(new AnonymousMechanism()); + } else if (remoteMechanism.equalsIgnoreCase("CRAM-MD5")) { + found.add(new CramMD5Mechanism()); + } else { + LOG.debug("Unknown remote mechanism {}, skipping", remoteMechanism); + } + } + + if (!found.isEmpty()) { + // Sorts by priority using Mechanism comparison and return the last value in + // list which is the Mechanism deemed to be the highest priority match. + Collections.sort(found); + match = found.get(found.size() - 1); + } + + LOG.info("Best match for SASL auth was: {}", match); + + return match; + } + + private void handleSaslStep() throws SecurityException { + try { + if (sasl.pending() != 0) { + byte[] challenge = new byte[sasl.pending()]; + sasl.recv(challenge, 0, challenge.length); + byte[] response = mechanism.getChallengeResponse(challenge); + sasl.send(response, 0, response.length); + } + } catch (SaslException se) { + // TODO - Better error message. + SecurityException jmsse = new SecurityException("Exception while processing SASL step."); + jmsse.initCause(se); + throw jmsse; + } + } + + private void handleSaslFail() throws SecurityException { + // TODO - Better error message. + throw new SecurityException("Client failed to authenticate"); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java new file mode 100644 index 0000000..7327ba6 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +/** + * Defines a result interface for Asynchronous operations. + */ +public interface AsyncResult { + + /** + * If the operation fails this method is invoked with the Exception + * that caused the failure. + * + * @param result + * The error that resulted in this asynchronous operation failing. + */ + void onFailure(Throwable result); + + /** + * If the operation succeeds the resulting value produced is set to null and + * the waiting parties are signaled. + */ + void onSuccess(); + + /** + * Returns true if the AsyncResult has completed. The task is considered complete + * regardless if it succeeded or failed. + * + * @return returns true if the asynchronous operation has completed. + */ + boolean isComplete(); + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java new file mode 100644 index 0000000..9f83a1d --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.util.IOExceptionSupport; + +/** + * Asynchronous Client Future class. + */ +public class ClientFuture extends WrappedAsyncResult { + + protected final CountDownLatch latch = new CountDownLatch(1); + protected Throwable error; + + public ClientFuture() { + super(null); + } + + public ClientFuture(AsyncResult watcher) { + super(watcher); + } + + @Override + public boolean isComplete() { + return latch.getCount() == 0; + } + + @Override + public void onFailure(Throwable result) { + error = result; + latch.countDown(); + super.onFailure(result); + } + + @Override + public void onSuccess() { + latch.countDown(); + super.onSuccess(); + } + + /** + * Timed wait for a response to a pending operation. + * + * @param amount + * The amount of time to wait before abandoning the wait. + * @param unit + * The unit to use for this wait period. + * + * @throws IOException if an error occurs while waiting for the response. + */ + public void sync(long amount, TimeUnit unit) throws IOException { + try { + latch.await(amount, unit); + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + + failOnError(); + } + + /** + * Waits for a response to some pending operation. + * + * @throws IOException if an error occurs while waiting for the response. + */ + public void sync() throws IOException { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + + failOnError(); + } + + private void failOnError() throws IOException { + Throwable cause = error; + if (cause != null) { + throw IOExceptionSupport.create(cause); + } + } +}
