http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java new file mode 100644 index 0000000..28e38f2 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -0,0 +1,454 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Session; + +/** + * Session class that manages a Proton session endpoint. + */ +public class AmqpSession extends AmqpAbstractResource<Session> { + + private final AtomicLong receiverIdGenerator = new AtomicLong(); + private final AtomicLong senderIdGenerator = new AtomicLong(); + + private final AmqpConnection connection; + private final String sessionId; + private final AmqpTransactionContext txContext; + + /** + * Create a new session instance. + * + * @param connection The parent connection that created the session. + * @param sessionId The unique ID value assigned to this session. + */ + public AmqpSession(AmqpConnection connection, String sessionId) { + this.connection = connection; + this.sessionId = sessionId; + this.txContext = new AmqpTransactionContext(this); + } + + /** + * Create a sender instance using the given address + * + * @param address the address to which the sender will produce its messages. + * @return a newly created sender that is ready for use. + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address) throws Exception { + return createSender(address, false); + } + + /** + * Create a sender instance using the given address + * + * @param address the address to which the sender will produce its messages. + * @param presettle controls if the created sender produces message that have already been marked settled. + * @return a newly created sender that is ready for use. + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, boolean presettle) throws Exception { + checkClosed(); + + final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId()); + sender.setPresettle(presettle); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + sender.setStateInspector(getStateInspector()); + sender.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return sender; + } + + /** + * Create a sender instance using the given Target + * + * @param target the caller created and configured Traget used to create the sender link. + * @return a newly created sender that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpSender createSender(Target target) throws Exception { + checkClosed(); + + final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId()); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + sender.setStateInspector(getStateInspector()); + sender.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return sender; + } + + /** + * Create a receiver instance using the given address + * + * @param address the address to which the receiver will subscribe for its messages. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address) throws Exception { + return createReceiver(address, null, false); + } + + /** + * Create a receiver instance using the given address + * + * @param address the address to which the receiver will subscribe for its messages. + * @param selector the JMS selector to use for the subscription + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, String selector) throws Exception { + return createReceiver(address, selector, false); + } + + /** + * Create a receiver instance using the given address + * + * @param address the address to which the receiver will subscribe for its messages. + * @param selector the JMS selector to use for the subscription + * @param noLocal should the subscription have messages from its connection filtered. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, String selector, boolean noLocal) throws Exception { + return createReceiver(address, selector, noLocal, false); + } + + /** + * Create a receiver instance using the given address + * + * @param address the address to which the receiver will subscribe for its messages. + * @param selector the JMS selector to use for the subscription + * @param noLocal should the subscription have messages from its connection filtered. + * @param presettle should the receiver be created with a settled sender mode. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(String address, + String selector, + boolean noLocal, + boolean presettle) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId()); + + receiver.setNoLocal(noLocal); + receiver.setPresettle(presettle); + if (selector != null && !selector.isEmpty()) { + receiver.setSelector(selector); + } + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + + /** + * Create a receiver instance using the given Source + * + * @param source the caller created and configured Source used to create the receiver link. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createReceiver(Source source) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + + /** + * Create a receiver instance using the given address that creates a durable subscription. + * + * @param address the address to which the receiver will subscribe for its messages. + * @param subscriptionName the name of the subscription that is being created. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createDurableReceiver(String address, String subscriptionName) throws Exception { + return createDurableReceiver(address, subscriptionName, null, false); + } + + /** + * Create a receiver instance using the given address that creates a durable subscription. + * + * @param address the address to which the receiver will subscribe for its messages. + * @param subscriptionName the name of the subscription that is being created. + * @param selector the JMS selector to use for the subscription + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createDurableReceiver(String address, + String subscriptionName, + String selector) throws Exception { + return createDurableReceiver(address, subscriptionName, selector, false); + } + + /** + * Create a receiver instance using the given address that creates a durable subscription. + * + * @param address the address to which the receiver will subscribe for its messages. + * @param subscriptionName the name of the subscription that is being created. + * @param selector the JMS selector to use for the subscription + * @param noLocal should the subscription have messages from its connection filtered. + * @return a newly created receiver that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createDurableReceiver(String address, + String subscriptionName, + String selector, + boolean noLocal) throws Exception { + checkClosed(); + + if (subscriptionName == null || subscriptionName.isEmpty()) { + throw new IllegalArgumentException("subscription name must not be null or empty."); + } + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId()); + receiver.setSubscriptionName(subscriptionName); + receiver.setNoLocal(noLocal); + if (selector != null && !selector.isEmpty()) { + receiver.setSelector(selector); + } + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + + /** + * Create a receiver instance using the given address that creates a durable subscription. + * + * @param subscriptionName the name of the subscription that should be queried for on the remote.. + * @return a newly created receiver that is ready for use if the subscription exists. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver lookupSubscription(String subscriptionName) throws Exception { + checkClosed(); + + if (subscriptionName == null || subscriptionName.isEmpty()) { + throw new IllegalArgumentException("subscription name must not be null or empty."); + } + + final ClientFuture request = new ClientFuture(); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, (String) null, getNextReceiverId()); + receiver.setSubscriptionName(subscriptionName); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.setStateInspector(getStateInspector()); + receiver.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return receiver; + } + + /** + * @return this session's parent AmqpConnection. + */ + public AmqpConnection getConnection() { + return connection; + } + + public Session getSession() { + return new UnmodifiableSession(getEndpoint()); + } + + public boolean isInTransaction() { + return txContext.isInTransaction(); + } + + @Override + public String toString() { + return "AmqpSession { " + sessionId + " }"; + } + + //----- Session Transaction Methods --------------------------------------// + + /** + * Starts a new transaction associated with this session. + * + * @throws Exception if an error occurs starting a new Transaction. + */ + public void begin() throws Exception { + if (txContext.isInTransaction()) { + throw new javax.jms.IllegalStateException("Session already has an active transaction"); + } + + txContext.begin(); + } + + /** + * Commit the current transaction associated with this session. + * + * @throws Exception if an error occurs committing the Transaction. + */ + public void commit() throws Exception { + if (!txContext.isInTransaction()) { + throw new javax.jms.IllegalStateException("Commit called on Session that does not have an active transaction"); + } + + txContext.commit(); + } + + /** + * Roll back the current transaction associated with this session. + * + * @throws Exception if an error occurs rolling back the Transaction. + */ + public void rollback() throws Exception { + if (!txContext.isInTransaction()) { + throw new javax.jms.IllegalStateException("Rollback called on Session that does not have an active transaction"); + } + + txContext.rollback(); + } + + //----- Internal access used to manage resources -------------------------// + + ScheduledExecutorService getScheduler() { + return connection.getScheduler(); + } + + Connection getProtonConnection() { + return connection.getProtonConnection(); + } + + void pumpToProtonTransport(AsyncResult request) { + connection.pumpToProtonTransport(request); + } + + AmqpTransactionId getTransactionId() { + return txContext.getTransactionId(); + } + + AmqpTransactionContext getTransactionContext() { + return txContext; + } + + //----- Private implementation details -----------------------------------// + + @Override + protected void doOpenInspection() { + try { + getStateInspector().inspectOpenedResource(getSession()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + @Override + protected void doClosedInspection() { + try { + getStateInspector().inspectClosedResource(getSession()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + private String getNextSenderId() { + return sessionId + ":" + senderIdGenerator.incrementAndGet(); + } + + private String getNextReceiverId() { + return sessionId + ":" + receiverIdGenerator.incrementAndGet(); + } + + private void checkClosed() { + if (isClosed() || connection.isClosed()) { + throw new IllegalStateException("Session is already closed"); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java new file mode 100644 index 0000000..c9ee57b --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSupport.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import javax.jms.InvalidClientIDException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; +import javax.jms.ResourceAllocationException; +import javax.jms.TransactionRolledBackException; +import java.io.IOException; +import java.util.Map; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.transaction.TransactionErrors; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ConnectionError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; + +public class AmqpSupport { + + // Symbols used for connection capabilities + public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container"); + public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + + // Symbols used to announce connection error information + public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); + public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field"); + public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id"); + + // Symbols used to announce connection redirect ErrorCondition 'info' + public static final Symbol PORT = Symbol.valueOf("port"); + public static final Symbol NETWORK_HOST = Symbol.valueOf("network-host"); + public static final Symbol OPEN_HOSTNAME = Symbol.valueOf("hostname"); + + // Symbols used for connection properties + public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); + public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); + + public static final Symbol PRODUCT = Symbol.valueOf("product"); + public static final Symbol VERSION = Symbol.valueOf("version"); + public static final Symbol PLATFORM = Symbol.valueOf("platform"); + + // Symbols used for receivers. + public static final Symbol COPY = Symbol.getSymbol("copy"); + public static final Symbol NO_LOCAL_SYMBOL = Symbol.valueOf("no-local"); + public static final Symbol SELECTOR_SYMBOL = Symbol.valueOf("jms-selector"); + + // Delivery states + public static final Rejected REJECTED = new Rejected(); + public static final Modified MODIFIED_FAILED = new Modified(); + public static final Modified MODIFIED_FAILED_UNDELIVERABLE = new Modified(); + + // Temporary Destination constants + public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + public static final String TEMP_QUEUE_CREATOR = "temp-queue-creator:"; + public static final String TEMP_TOPIC_CREATOR = "temp-topic-creator:"; + + //----- Static initializer -----------------------------------------------// + + static { + MODIFIED_FAILED.setDeliveryFailed(true); + + MODIFIED_FAILED_UNDELIVERABLE.setDeliveryFailed(true); + MODIFIED_FAILED_UNDELIVERABLE.setUndeliverableHere(true); + } + + //----- Utility Methods --------------------------------------------------// + + /** + * Given an ErrorCondition instance create a new Exception that best matches + * the error type. + * + * @param errorCondition The ErrorCondition returned from the remote peer. + * @return a new Exception instance that best matches the ErrorCondition value. + */ + public static Exception convertToException(ErrorCondition errorCondition) { + Exception remoteError = null; + + if (errorCondition != null && errorCondition.getCondition() != null) { + Symbol error = errorCondition.getCondition(); + String message = extractErrorMessage(errorCondition); + + if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) { + remoteError = new JMSSecurityException(message); + } + else if (error.equals(AmqpError.RESOURCE_LIMIT_EXCEEDED)) { + remoteError = new ResourceAllocationException(message); + } + else if (error.equals(AmqpError.NOT_FOUND)) { + remoteError = new InvalidDestinationException(message); + } + else if (error.equals(TransactionErrors.TRANSACTION_ROLLBACK)) { + remoteError = new TransactionRolledBackException(message); + } + else if (error.equals(ConnectionError.REDIRECT)) { + remoteError = createRedirectException(error, message, errorCondition); + } + else if (error.equals(AmqpError.INVALID_FIELD)) { + Map<?, ?> info = errorCondition.getInfo(); + if (info != null && CONTAINER_ID.equals(info.get(INVALID_FIELD))) { + remoteError = new InvalidClientIDException(message); + } + else { + remoteError = new JMSException(message); + } + } + else { + remoteError = new JMSException(message); + } + } + else { + remoteError = new JMSException("Unknown error from remote peer"); + } + + return remoteError; + } + + /** + * Attempt to read and return the embedded error message in the given ErrorCondition + * object. If no message can be extracted a generic message is returned. + * + * @param errorCondition The ErrorCondition to extract the error message from. + * @return an error message extracted from the given ErrorCondition. + */ + public static String extractErrorMessage(ErrorCondition errorCondition) { + String message = "Received error from remote peer without description"; + if (errorCondition != null) { + if (errorCondition.getDescription() != null && !errorCondition.getDescription().isEmpty()) { + message = errorCondition.getDescription(); + } + + Symbol condition = errorCondition.getCondition(); + if (condition != null) { + message = message + " [condition = " + condition + "]"; + } + } + + return message; + } + + /** + * When a redirect type exception is received this method is called to create the + * appropriate redirect exception type containing the error details needed. + * + * @param error the Symbol that defines the redirection error type. + * @param message the basic error message that should used or amended for the returned exception. + * @param condition the ErrorCondition that describes the redirection. + * @return an Exception that captures the details of the redirection error. + */ + public static Exception createRedirectException(Symbol error, String message, ErrorCondition condition) { + Exception result = null; + Map<?, ?> info = condition.getInfo(); + + if (info == null) { + result = new IOException(message + " : Redirection information not set."); + } + else { + String hostname = (String) info.get(OPEN_HOSTNAME); + + String networkHost = (String) info.get(NETWORK_HOST); + if (networkHost == null || networkHost.isEmpty()) { + result = new IOException(message + " : Redirection information not set."); + } + + int port = 0; + try { + port = Integer.valueOf(info.get(PORT).toString()); + } + catch (Exception ex) { + result = new IOException(message + " : Redirection information not set."); + } + + result = new AmqpRedirectedException(message, hostname, networkHost, port); + } + + return result; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java new file mode 100644 index 0000000..dcf23d2 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.ClientFutureSynchronization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Defines a context under which resources in a given session + * will operate inside transaction scoped boundaries. + */ +public class AmqpTransactionContext { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class); + + private final AmqpSession session; + private final Set<AmqpReceiver> txReceivers = new LinkedHashSet<>(); + + private AmqpTransactionCoordinator coordinator; + private AmqpTransactionId transactionId; + + public AmqpTransactionContext(AmqpSession session) { + this.session = session; + } + + /** + * Begins a new transaction scoped to the target session. + * + * @param txId The transaction Id to use for this new transaction. + * @throws Exception if an error occurs while starting the transaction. + */ + public void begin() throws Exception { + if (transactionId != null) { + throw new IOException("Begin called while a TX is still Active."); + } + + final AmqpTransactionId txId = session.getConnection().getNextTransactionId(); + final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() { + + @Override + public void onPendingSuccess() { + transactionId = txId; + } + + @Override + public void onPendingFailure(Throwable cause) { + transactionId = null; + } + }); + + LOG.info("Attempting to Begin TX:[{}]", txId); + + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + if (coordinator == null || coordinator.isClosed()) { + LOG.info("Creating new Coordinator for TX:[{}]", txId); + coordinator = new AmqpTransactionCoordinator(session); + coordinator.open(new AsyncResult() { + + @Override + public void onSuccess() { + try { + LOG.info("Attempting to declare TX:[{}]", txId); + coordinator.declare(txId, request); + } + catch (Exception e) { + request.onFailure(e); + } + } + + @Override + public void onFailure(Throwable result) { + request.onFailure(result); + } + + @Override + public boolean isComplete() { + return request.isComplete(); + } + }); + } + else { + try { + LOG.info("Attempting to declare TX:[{}]", txId); + coordinator.declare(txId, request); + } + catch (Exception e) { + request.onFailure(e); + } + } + + session.pumpToProtonTransport(request); + } + }); + + request.sync(); + } + + /** + * Commit this transaction which then ends the lifetime of the transacted operation. + * + * @throws Exception if an error occurs while performing the commit + */ + public void commit() throws Exception { + if (transactionId == null) { + throw new IllegalStateException("Commit called with no active Transaction."); + } + + preCommit(); + + final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() { + + @Override + public void onPendingSuccess() { + transactionId = null; + postCommit(); + } + + @Override + public void onPendingFailure(Throwable cause) { + transactionId = null; + postCommit(); + } + }); + + LOG.debug("Commit on TX[{}] initiated", transactionId); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + try { + LOG.info("Attempting to commit TX:[{}]", transactionId); + coordinator.discharge(transactionId, request, true); + session.pumpToProtonTransport(request); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + /** + * Rollback any transacted work performed under the current transaction. + * + * @throws Exception if an error occurs during the rollback operation. + */ + public void rollback() throws Exception { + if (transactionId == null) { + throw new IllegalStateException("Rollback called with no active Transaction."); + } + + preRollback(); + + final ClientFuture request = new ClientFuture(new ClientFutureSynchronization() { + + @Override + public void onPendingSuccess() { + transactionId = null; + postRollback(); + } + + @Override + public void onPendingFailure(Throwable cause) { + transactionId = null; + postRollback(); + } + }); + + LOG.debug("Rollback on TX[{}] initiated", transactionId); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + try { + LOG.info("Attempting to roll back TX:[{}]", transactionId); + coordinator.discharge(transactionId, request, false); + session.pumpToProtonTransport(request); + } + catch (Exception e) { + request.onFailure(e); + } + } + }); + + request.sync(); + } + + //----- Internal access to context properties ----------------------------// + + AmqpTransactionCoordinator getCoordinator() { + return coordinator; + } + + AmqpTransactionId getTransactionId() { + return transactionId; + } + + boolean isInTransaction() { + return transactionId != null; + } + + void registerTxConsumer(AmqpReceiver consumer) { + txReceivers.add(consumer); + } + + //----- Transaction pre / post completion --------------------------------// + + private void preCommit() { + for (AmqpReceiver receiver : txReceivers) { + receiver.preCommit(); + } + } + + private void preRollback() { + for (AmqpReceiver receiver : txReceivers) { + receiver.preRollback(); + } + } + + private void postCommit() { + for (AmqpReceiver receiver : txReceivers) { + receiver.postCommit(); + } + + txReceivers.clear(); + } + + private void postRollback() { + for (AmqpReceiver receiver : txReceivers) { + receiver.postRollback(); + } + + txReceivers.clear(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java new file mode 100644 index 0000000..aded162 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionCoordinator.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.TransactionRolledBackException; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.transaction.Coordinator; +import org.apache.qpid.proton.amqp.transaction.Declare; +import org.apache.qpid.proton.amqp.transaction.Declared; +import org.apache.qpid.proton.amqp.transaction.Discharge; +import org.apache.qpid.proton.amqp.transaction.TxnCapability; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents the AMQP Transaction coordinator link used by the transaction context + * of a session to control the lifetime of a given transaction. + */ +public class AmqpTransactionCoordinator extends AmqpAbstractResource<Sender> { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class); + + private final byte[] OUTBOUND_BUFFER = new byte[64]; + + private final AmqpSession session; + private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(); + + private List<Delivery> pendingDeliveries = new LinkedList<>(); + private Map<AmqpTransactionId, AsyncResult> pendingRequests = new HashMap<>(); + + public AmqpTransactionCoordinator(AmqpSession session) { + this.session = session; + } + + @Override + public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + try { + Iterator<Delivery> deliveries = pendingDeliveries.iterator(); + while (deliveries.hasNext()) { + Delivery pendingDelivery = deliveries.next(); + if (!pendingDelivery.remotelySettled()) { + continue; + } + + DeliveryState state = pendingDelivery.getRemoteState(); + AmqpTransactionId txId = (AmqpTransactionId) pendingDelivery.getContext(); + AsyncResult pendingRequest = pendingRequests.get(txId); + + if (pendingRequest == null) { + throw new IllegalStateException("Pending tx operation with no pending request"); + } + + if (state instanceof Declared) { + LOG.debug("New TX started: {}", txId.getTxId()); + Declared declared = (Declared) state; + txId.setRemoteTxId(declared.getTxnId()); + pendingRequest.onSuccess(); + } + else if (state instanceof Rejected) { + LOG.debug("Last TX request failed: {}", txId.getTxId()); + Rejected rejected = (Rejected) state; + Exception cause = AmqpSupport.convertToException(rejected.getError()); + JMSException failureCause = null; + if (txId.isCommit()) { + failureCause = new TransactionRolledBackException(cause.getMessage()); + } + else { + failureCause = new JMSException(cause.getMessage()); + } + + pendingRequest.onFailure(failureCause); + } + else { + LOG.debug("Last TX request succeeded: {}", txId.getTxId()); + pendingRequest.onSuccess(); + } + + // Clear state data + pendingDelivery.settle(); + pendingRequests.remove(txId); + deliveries.remove(); + } + + super.processDeliveryUpdates(connection); + } + catch (Exception e) { + throw IOExceptionSupport.create(e); + } + } + + public void declare(AmqpTransactionId txId, AsyncResult request) throws Exception { + if (txId.getRemoteTxId() != null) { + throw new IllegalStateException("Declar called while a TX is still Active."); + } + + if (isClosed()) { + request.onFailure(new JMSException("Cannot start new transaction: Coordinator remotely closed")); + return; + } + + Message message = Message.Factory.create(); + Declare declare = new Declare(); + message.setBody(new AmqpValue(declare)); + + Delivery pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag()); + pendingDelivery.setContext(txId); + + // Store away for completion + pendingDeliveries.add(pendingDelivery); + pendingRequests.put(txId, request); + + sendTxCommand(message); + } + + public void discharge(AmqpTransactionId txId, AsyncResult request, boolean commit) throws Exception { + + if (isClosed()) { + Exception failureCause = null; + + if (commit) { + failureCause = new TransactionRolledBackException("Transaction inbout: Coordinator remotely closed"); + } + else { + failureCause = new JMSException("Rollback cannot complete: Coordinator remotely closed"); + } + + request.onFailure(failureCause); + return; + } + + // Store the context of this action in the transaction ID for later completion. + txId.setState(commit ? AmqpTransactionId.COMMIT_MARKER : AmqpTransactionId.ROLLBACK_MARKER); + + Message message = Message.Factory.create(); + Discharge discharge = new Discharge(); + discharge.setFail(!commit); + discharge.setTxnId(txId.getRemoteTxId()); + message.setBody(new AmqpValue(discharge)); + + Delivery pendingDelivery = getEndpoint().delivery(tagGenerator.getNextTag()); + pendingDelivery.setContext(txId); + + // Store away for completion + pendingDeliveries.add(pendingDelivery); + pendingRequests.put(txId, request); + + sendTxCommand(message); + } + + //----- Base class overrides ---------------------------------------------// + + @Override + public void remotelyClosed(AmqpConnection connection) { + + Exception txnError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition()); + + // Alert any pending operation that the link failed to complete the pending + // begin / commit / rollback operation. + for (AsyncResult pendingRequest : pendingRequests.values()) { + pendingRequest.onFailure(txnError); + } + + // Purge linkages to pending operations. + pendingDeliveries.clear(); + pendingRequests.clear(); + + // Override the base class version because we do not want to propagate + // an error up to the client if remote close happens as that is an + // acceptable way for the remote to indicate the discharge could not + // be applied. + + if (getEndpoint() != null) { + getEndpoint().close(); + getEndpoint().free(); + } + + LOG.debug("Transaction Coordinator link {} was remotely closed", getEndpoint()); + } + + //----- Internal implementation ------------------------------------------// + + private void sendTxCommand(Message message) throws IOException { + int encodedSize = 0; + byte[] buffer = OUTBOUND_BUFFER; + while (true) { + try { + encodedSize = message.encode(buffer, 0, buffer.length); + break; + } + catch (BufferOverflowException e) { + buffer = new byte[buffer.length * 2]; + } + } + + Sender sender = getEndpoint(); + sender.send(buffer, 0, encodedSize); + sender.advance(); + } + + @Override + protected void doOpen() { + Coordinator coordinator = new Coordinator(); + coordinator.setCapabilities(TxnCapability.LOCAL_TXN); + Source source = new Source(); + + String coordinatorName = "qpid-jms:coordinator:" + session.getConnection().getConnectionId(); + + Sender sender = session.getEndpoint().sender(coordinatorName); + sender.setSource(source); + sender.setTarget(coordinator); + sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + setEndpoint(sender); + + super.doOpen(); + } + + @Override + protected void doOpenInspection() { + // TODO + } + + @Override + protected void doClosedInspection() { + // TODO + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java new file mode 100644 index 0000000..5dcdfe1 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionId.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.Binary; + +/** + * Wrapper For Transaction state in identification + */ +public class AmqpTransactionId { + + public static final int DECLARE_MARKER = 1; + public static final int ROLLBACK_MARKER = 2; + public static final int COMMIT_MARKER = 3; + + private final String txId; + private Binary remoteTxId; + private int state = DECLARE_MARKER; + + public AmqpTransactionId(String txId) { + this.txId = txId; + } + + public boolean isDeclare() { + return state == DECLARE_MARKER; + } + + public boolean isCommit() { + return state == COMMIT_MARKER; + } + + public boolean isRollback() { + return state == ROLLBACK_MARKER; + } + + public void setState(int state) { + this.state = state; + } + + public String getTxId() { + return txId; + } + + public Binary getRemoteTxId() { + return remoteTxId; + } + + public void setRemoteTxId(Binary remoteTxId) { + this.remoteTxId = remoteTxId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((txId == null) ? 0 : txId.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + AmqpTransactionId other = (AmqpTransactionId) obj; + if (txId == null) { + if (other.txId != null) { + return false; + } + } + else if (!txId.equals(other.txId)) { + return false; + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java new file mode 100644 index 0000000..85ee07f --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransferTagGenerator.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.UnsupportedEncodingException; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * Utility class that can generate and if enabled pool the binary tag values + * used to identify transfers over an AMQP link. + */ +public final class AmqpTransferTagGenerator { + + public static final int DEFAULT_TAG_POOL_SIZE = 1024; + + private long nextTagId; + private int maxPoolSize = DEFAULT_TAG_POOL_SIZE; + + private final Set<byte[]> tagPool; + + public AmqpTransferTagGenerator() { + this(false); + } + + public AmqpTransferTagGenerator(boolean pool) { + if (pool) { + this.tagPool = new LinkedHashSet<>(); + } + else { + this.tagPool = null; + } + } + + /** + * Retrieves the next available tag. + * + * @return a new or unused tag depending on the pool option. + */ + public byte[] getNextTag() { + byte[] rc; + if (tagPool != null && !tagPool.isEmpty()) { + final Iterator<byte[]> iterator = tagPool.iterator(); + rc = iterator.next(); + iterator.remove(); + } + else { + try { + rc = Long.toHexString(nextTagId++).getBytes("UTF-8"); + } + catch (UnsupportedEncodingException e) { + // This should never happen since we control the input. + throw new RuntimeException(e); + } + } + return rc; + } + + /** + * When used as a pooled cache of tags the unused tags should always be returned once + * the transfer has been settled. + * + * @param data a previously borrowed tag that is no longer in use. + */ + public void returnTag(byte[] data) { + if (tagPool != null && tagPool.size() < maxPoolSize) { + tagPool.add(data); + } + } + + /** + * Gets the current max pool size value. + * + * @return the current max tag pool size. + */ + public int getMaxPoolSize() { + return maxPoolSize; + } + + /** + * Sets the max tag pool size. If the size is smaller than the current number + * of pooled tags the pool will drain over time until it matches the max. + * + * @param maxPoolSize the maximum number of tags to hold in the pool. + */ + public void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java new file mode 100644 index 0000000..8a4ce6b --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpUnknownFilterType.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; + +/** + * A Described Type wrapper for an unsupported filter that the broker should ignore. + */ +public class AmqpUnknownFilterType implements DescribedType { + + public static final AmqpUnknownFilterType UNKOWN_FILTER = new AmqpUnknownFilterType(); + + public static final UnsignedLong UNKNOWN_FILTER_CODE = UnsignedLong.valueOf(0x0000468C00000099L); + public static final Symbol UNKNOWN_FILTER_NAME = Symbol.valueOf("apache.org:unkown-filter:string"); + public static final Object[] UNKNOWN_FILTER_IDS = new Object[]{UNKNOWN_FILTER_CODE, UNKNOWN_FILTER_NAME}; + + private final String payload; + + public AmqpUnknownFilterType() { + this.payload = "UnknownFilter{}"; + } + + @Override + public Object getDescriptor() { + return UNKNOWN_FILTER_CODE; + } + + @Override + public Object getDescribed() { + return this.payload; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java new file mode 100644 index 0000000..5f46cb6 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; + +/** + * Abstract base for a validation hook that is used in tests to check + * the state of a remote resource after a variety of lifecycle events. + */ +public class AmqpValidator { + + private boolean valid = true; + private String errorMessage; + + public void inspectOpenedResource(Connection connection) { + + } + + public void inspectOpenedResource(Session session) { + + } + + public void inspectOpenedResource(Sender sender) { + + } + + public void inspectOpenedResource(Receiver receiver) { + + } + + public void inspectClosedResource(Connection remoteConnection) { + + } + + public void inspectClosedResource(Session session) { + + } + + public void inspectClosedResource(Sender sender) { + + } + + public void inspectClosedResource(Receiver receiver) { + + } + + public void inspectDetachedResource(Sender sender) { + + } + + public void inspectDetachedResource(Receiver receiver) { + + } + + public boolean isValid() { + return valid; + } + + protected void setValid(boolean valid) { + this.valid = valid; + } + + public String getErrorMessage() { + return errorMessage; + } + + protected void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + protected void markAsInvalid(String errorMessage) { + if (valid) { + setValid(false); + setErrorMessage(errorMessage); + } + } + + public void assertValid() { + if (!isValid()) { + throw new AssertionError(errorMessage); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java new file mode 100644 index 0000000..011fba7 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AbstractMechanism.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +import java.util.HashMap; +import java.util.Map; + +/** + * Base class for SASL Authentication Mechanism that implements the basic + * methods of a Mechanism class. + */ +public abstract class AbstractMechanism implements Mechanism { + + protected static final byte[] EMPTY = new byte[0]; + + private String username; + private String password; + private String authzid; + private Map<String, Object> properties = new HashMap<>(); + + @Override + public int compareTo(Mechanism other) { + + if (getPriority() < other.getPriority()) { + return -1; + } + else if (getPriority() > other.getPriority()) { + return 1; + } + + return 0; + } + + @Override + public void setUsername(String value) { + this.username = value; + } + + @Override + public String getUsername() { + return username; + } + + @Override + public void setPassword(String value) { + this.password = value; + } + + @Override + public String getPassword() { + return this.password; + } + + @Override + public void setProperties(Map<String, Object> properties) { + this.properties = properties; + } + + @Override + public Map<String, Object> getProperties() { + return this.properties; + } + + @Override + public String toString() { + return "SASL-" + getName(); + } + + @Override + public String getAuthzid() { + return authzid; + } + + @Override + public void setAuthzid(String authzid) { + this.authzid = authzid; + } + + @Override + public boolean isApplicable(String username, String password) { + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java new file mode 100644 index 0000000..c3d36aa --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/AnonymousMechanism.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +/** + * Implements the Anonymous SASL authentication mechanism. + */ +public class AnonymousMechanism extends AbstractMechanism { + + @Override + public byte[] getInitialResponse() { + return EMPTY; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) { + return EMPTY; + } + + @Override + public int getPriority() { + return PRIORITY.LOWEST.getValue(); + } + + @Override + public String getName() { + return "ANONYMOUS"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java new file mode 100644 index 0000000..4821314 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/CramMD5Mechanism.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.security.sasl.SaslException; +import java.io.UnsupportedEncodingException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + +/** + * Implements the SASL PLAIN authentication Mechanism. + * + * User name and Password values are sent without being encrypted. + */ +public class CramMD5Mechanism extends AbstractMechanism { + + private static final String ASCII = "ASCII"; + private static final String HMACMD5 = "HMACMD5"; + private boolean sentResponse; + + @Override + public int getPriority() { + return PRIORITY.HIGH.getValue(); + } + + @Override + public String getName() { + return "CRAM-MD5"; + } + + @Override + public byte[] getInitialResponse() { + return EMPTY; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) throws SaslException { + if (!sentResponse && challenge != null && challenge.length != 0) { + try { + SecretKeySpec key = new SecretKeySpec(getPassword().getBytes(ASCII), HMACMD5); + Mac mac = Mac.getInstance(HMACMD5); + mac.init(key); + + byte[] bytes = mac.doFinal(challenge); + + StringBuffer hash = new StringBuffer(getUsername()); + hash.append(' '); + for (int i = 0; i < bytes.length; i++) { + String hex = Integer.toHexString(0xFF & bytes[i]); + if (hex.length() == 1) { + hash.append('0'); + } + hash.append(hex); + } + + sentResponse = true; + return hash.toString().getBytes(ASCII); + } + catch (UnsupportedEncodingException e) { + throw new SaslException("Unable to utilise required encoding", e); + } + catch (InvalidKeyException e) { + throw new SaslException("Unable to utilise key", e); + } + catch (NoSuchAlgorithmException e) { + throw new SaslException("Unable to utilise required algorithm", e); + } + } + else { + return EMPTY; + } + } + + @Override + public boolean isApplicable(String username, String password) { + return username != null && username.length() > 0 && password != null && password.length() > 0; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java new file mode 100644 index 0000000..a79406f --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/Mechanism.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +import javax.security.sasl.SaslException; +import java.util.Map; + +/** + * Interface for all SASL authentication mechanism implementations. + */ +public interface Mechanism extends Comparable<Mechanism> { + + /** + * Relative priority values used to arrange the found SASL + * mechanisms in a preferred order where the level of security + * generally defines the preference. + */ + enum PRIORITY { + LOWEST(0), + LOW(1), + MEDIUM(2), + HIGH(3), + HIGHEST(4); + + private final int value; + + PRIORITY(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + }; + + /** + * @return return the relative priority of this SASL mechanism. + */ + int getPriority(); + + /** + * @return the well known name of this SASL mechanism. + */ + String getName(); + + /** + * @return the response buffer used to answer the initial SASL cycle. + * @throws SaslException if an error occurs computing the response. + */ + byte[] getInitialResponse() throws SaslException; + + /** + * Create a response based on a given challenge from the remote peer. + * + * @param challenge the challenge that this Mechanism should response to. + * @return the response that answers the given challenge. + * @throws SaslException if an error occurs computing the response. + */ + byte[] getChallengeResponse(byte[] challenge) throws SaslException; + + /** + * Sets the user name value for this Mechanism. The Mechanism can ignore this + * value if it does not utilize user name in it's authentication processing. + * + * @param username The user name given. + */ + void setUsername(String value); + + /** + * Returns the configured user name value for this Mechanism. + * + * @return the currently set user name value for this Mechanism. + */ + String getUsername(); + + /** + * Sets the password value for this Mechanism. The Mechanism can ignore this + * value if it does not utilize a password in it's authentication processing. + * + * @param username The user name given. + */ + void setPassword(String value); + + /** + * Returns the configured password value for this Mechanism. + * + * @return the currently set password value for this Mechanism. + */ + String getPassword(); + + /** + * Sets any additional Mechanism specific properties using a Map<String, Object> + * + * @param options the map of additional properties that this Mechanism should utilize. + */ + void setProperties(Map<String, Object> options); + + /** + * The currently set Properties for this Mechanism. + * + * @return the current set of configuration Properties for this Mechanism. + */ + Map<String, Object> getProperties(); + + /** + * Using the configured credentials, check if the mechanism applies or not. + * + * @param username The user name that will be used with this mechanism + * @param password The password that will be used with this mechanism + * @return true if the mechanism works with the provided credentials or not. + */ + boolean isApplicable(String username, String password); + + /** + * Get the currently configured Authentication ID. + * + * @return the currently set Authentication ID. + */ + String getAuthzid(); + + /** + * Sets an Authentication ID that some mechanism can use during the + * challenge response phase. + * + * @param authzid The Authentication ID to use. + */ + void setAuthzid(String authzid); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java new file mode 100644 index 0000000..d9b3ba3 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/PlainMechanism.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +/** + * Implements the SASL PLAIN authentication Mechanism. + * + * User name and Password values are sent without being encrypted. + */ +public class PlainMechanism extends AbstractMechanism { + + public static final String MECH_NAME = "PLAIN"; + + @Override + public int getPriority() { + return PRIORITY.MEDIUM.getValue(); + } + + @Override + public String getName() { + return MECH_NAME; + } + + @Override + public byte[] getInitialResponse() { + + String authzid = getAuthzid(); + String username = getUsername(); + String password = getPassword(); + + if (authzid == null) { + authzid = ""; + } + + if (username == null) { + username = ""; + } + + if (password == null) { + password = ""; + } + + byte[] authzidBytes = authzid.getBytes(); + byte[] usernameBytes = username.getBytes(); + byte[] passwordBytes = password.getBytes(); + byte[] data = new byte[authzidBytes.length + 1 + usernameBytes.length + 1 + passwordBytes.length]; + System.arraycopy(authzidBytes, 0, data, 0, authzidBytes.length); + System.arraycopy(usernameBytes, 0, data, 1 + authzidBytes.length, usernameBytes.length); + System.arraycopy(passwordBytes, 0, data, 2 + authzidBytes.length + usernameBytes.length, passwordBytes.length); + return data; + } + + @Override + public byte[] getChallengeResponse(byte[] challenge) { + return EMPTY; + } + + @Override + public boolean isApplicable(String username, String password) { + return username != null && username.length() > 0 && password != null && password.length() > 0; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java new file mode 100644 index 0000000..5c25fae --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/sasl/SaslAuthenticator.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.sasl; + +import javax.security.sasl.SaslException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.qpid.proton.engine.Sasl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manage the SASL authentication process + */ +public class SaslAuthenticator { + + private static final Logger LOG = LoggerFactory.getLogger(SaslAuthenticator.class); + + private final Sasl sasl; + private final String username; + private final String password; + private final String authzid; + private Mechanism mechanism; + private String mechanismRestriction; + + /** + * Create the authenticator and initialize it. + * + * @param sasl The Proton SASL entry point this class will use to manage the authentication. + * @param username The user name that will be used to authenticate. + * @param password The password that will be used to authenticate. + * @param authzid The authzid used when authenticating (currently only with PLAIN) + * @param mechanismRestriction A particular mechanism to use (if offered by the server) or null to allow selection. + */ + public SaslAuthenticator(Sasl sasl, String username, String password, String authzid, String mechanismRestriction) { + this.sasl = sasl; + this.username = username; + this.password = password; + this.authzid = authzid; + this.mechanismRestriction = mechanismRestriction; + } + + /** + * Process the SASL authentication cycle until such time as an outcome is determine. This + * method must be called by the managing entity until the return value is true indicating a + * successful authentication or a JMSSecurityException is thrown indicating that the + * handshake failed. + * + * @throws SecurityException + */ + public boolean authenticate() throws SecurityException { + switch (sasl.getState()) { + case PN_SASL_IDLE: + handleSaslInit(); + break; + case PN_SASL_STEP: + handleSaslStep(); + break; + case PN_SASL_FAIL: + handleSaslFail(); + break; + case PN_SASL_PASS: + return true; + default: + } + + return false; + } + + private void handleSaslInit() throws SecurityException { + try { + String[] remoteMechanisms = sasl.getRemoteMechanisms(); + if (remoteMechanisms != null && remoteMechanisms.length != 0) { + mechanism = findMatchingMechanism(remoteMechanisms); + if (mechanism != null) { + mechanism.setUsername(username); + mechanism.setPassword(password); + mechanism.setAuthzid(authzid); + // TODO - set additional options from URI. + // TODO - set a host value. + + sasl.setMechanisms(mechanism.getName()); + byte[] response = mechanism.getInitialResponse(); + if (response != null && response.length != 0) { + sasl.send(response, 0, response.length); + } + } + else { + // TODO - Better error message. + throw new SecurityException("Could not find a matching SASL mechanism for the remote peer."); + } + } + } + catch (SaslException se) { + // TODO - Better error message. + SecurityException jmsse = new SecurityException("Exception while processing SASL init."); + jmsse.initCause(se); + throw jmsse; + } + } + + private Mechanism findMatchingMechanism(String... remoteMechanisms) { + + Mechanism match = null; + List<Mechanism> found = new ArrayList<>(); + + for (String remoteMechanism : remoteMechanisms) { + if (mechanismRestriction != null && !mechanismRestriction.equals(remoteMechanism)) { + LOG.debug("Skipping {} mechanism because it is not the configured mechanism restriction {}", remoteMechanism, mechanismRestriction); + continue; + } + + Mechanism mechanism = null; + if (remoteMechanism.equalsIgnoreCase("PLAIN")) { + mechanism = new PlainMechanism(); + } + else if (remoteMechanism.equalsIgnoreCase("ANONYMOUS")) { + mechanism = new AnonymousMechanism(); + } + else if (remoteMechanism.equalsIgnoreCase("CRAM-MD5")) { + mechanism = new CramMD5Mechanism(); + } + else { + LOG.debug("Unknown remote mechanism {}, skipping", remoteMechanism); + continue; + } + + if (mechanism.isApplicable(username, password)) { + found.add(mechanism); + } + } + + if (!found.isEmpty()) { + // Sorts by priority using Mechanism comparison and return the last value in + // list which is the Mechanism deemed to be the highest priority match. + Collections.sort(found); + match = found.get(found.size() - 1); + } + + LOG.info("Best match for SASL auth was: {}", match); + + return match; + } + + private void handleSaslStep() throws SecurityException { + try { + if (sasl.pending() != 0) { + byte[] challenge = new byte[sasl.pending()]; + sasl.recv(challenge, 0, challenge.length); + byte[] response = mechanism.getChallengeResponse(challenge); + sasl.send(response, 0, response.length); + } + } + catch (SaslException se) { + // TODO - Better error message. + SecurityException jmsse = new SecurityException("Exception while processing SASL step."); + jmsse.initCause(se); + throw jmsse; + } + } + + private void handleSaslFail() throws SecurityException { + // TODO - Better error message. + throw new SecurityException("Client failed to authenticate"); + } +}
