ARTEMIS-637 Port 5.x AMQP test client
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/df41a60e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/df41a60e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/df41a60e Branch: refs/heads/master Commit: df41a60e21783f33f435ef3a9efa54f9dab146d7 Parents: 5695164 Author: Martyn Taylor <[email protected]> Authored: Fri Jul 15 18:03:31 2016 +0100 Committer: Andy Taylor <[email protected]> Committed: Wed Jul 20 10:33:44 2016 +0100 ---------------------------------------------------------------------- tests/artemis-test-support/pom.xml | 57 ++ .../transport/amqp/AmqpProtocolException.java | 62 ++ .../activemq/transport/amqp/AmqpSupport.java | 206 ++++ .../amqp/client/AmqpAbstractResource.java | 321 +++++++ .../transport/amqp/client/AmqpClient.java | 245 +++++ .../transport/amqp/client/AmqpConnection.java | 720 ++++++++++++++ .../amqp/client/AmqpConnectionListener.java | 31 + .../client/AmqpDefaultConnectionListener.java | 28 + .../transport/amqp/client/AmqpEventSink.java | 69 ++ .../amqp/client/AmqpJmsSelectorFilter.java | 48 + .../transport/amqp/client/AmqpMessage.java | 515 ++++++++++ .../amqp/client/AmqpNoLocalFilter.java | 45 + .../transport/amqp/client/AmqpReceiver.java | 946 +++++++++++++++++++ .../amqp/client/AmqpRedirectedException.java | 61 ++ .../transport/amqp/client/AmqpResource.java | 108 +++ .../transport/amqp/client/AmqpSender.java | 452 +++++++++ .../transport/amqp/client/AmqpSession.java | 454 +++++++++ .../transport/amqp/client/AmqpSupport.java | 195 ++++ .../amqp/client/AmqpTransactionContext.java | 261 +++++ .../amqp/client/AmqpTransactionCoordinator.java | 262 +++++ .../amqp/client/AmqpTransactionId.java | 98 ++ .../amqp/client/AmqpTransferTagGenerator.java | 104 ++ .../amqp/client/AmqpUnknownFilterType.java | 49 + .../transport/amqp/client/AmqpValidator.java | 101 ++ .../amqp/client/sasl/AbstractMechanism.java | 97 ++ .../amqp/client/sasl/AnonymousMechanism.java | 43 + .../amqp/client/sasl/CramMD5Mechanism.java | 94 ++ .../transport/amqp/client/sasl/Mechanism.java | 143 +++ .../amqp/client/sasl/PlainMechanism.java | 76 ++ .../amqp/client/sasl/SaslAuthenticator.java | 182 ++++ .../client/transport/NettyTcpTransport.java | 402 ++++++++ .../amqp/client/transport/NettyTransport.java | 52 + .../client/transport/NettyTransportFactory.java | 80 ++ .../transport/NettyTransportListener.java | 46 + .../client/transport/NettyTransportOptions.java | 177 ++++ .../transport/NettyTransportSslOptions.java | 284 ++++++ .../client/transport/NettyTransportSupport.java | 288 ++++++ .../amqp/client/transport/NettyWSTransport.java | 472 +++++++++ .../PartialPooledByteBufAllocator.java | 134 +++ .../client/transport/X509AliasKeyManager.java | 86 ++ .../transport/amqp/client/util/AsyncResult.java | 46 + .../amqp/client/util/ClientFuture.java | 110 +++ .../util/ClientFutureSynchronization.java | 30 + .../amqp/client/util/IOExceptionSupport.java | 45 + .../transport/amqp/client/util/IdGenerator.java | 274 ++++++ .../amqp/client/util/NoOpAsyncResult.java | 40 + .../amqp/client/util/PropertyUtil.java | 533 +++++++++++ .../amqp/client/util/StringArrayConverter.java | 64 ++ .../amqp/client/util/TypeConversionSupport.java | 218 +++++ .../client/util/UnmodifiableConnection.java | 202 ++++ .../amqp/client/util/UnmodifiableDelivery.java | 170 ++++ .../amqp/client/util/UnmodifiableLink.java | 276 ++++++ .../amqp/client/util/UnmodifiableReceiver.java | 59 ++ .../amqp/client/util/UnmodifiableSender.java | 45 + .../amqp/client/util/UnmodifiableSession.java | 150 +++ .../amqp/client/util/UnmodifiableTransport.java | 274 ++++++ .../amqp/client/util/WrappedAsyncResult.java | 59 ++ tests/integration-tests/pom.xml | 5 + tests/pom.xml | 8 + 59 files changed, 10702 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/pom.xml ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/pom.xml b/tests/artemis-test-support/pom.xml new file mode 100644 index 0000000..ec0c49d --- /dev/null +++ b/tests/artemis-test-support/pom.xml @@ -0,0 +1,57 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.activemq.tests</groupId> + <artifactId>artemis-tests-pom</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <artifactId>artemis-test-support</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ Artemis Test Support</name> + + <properties> + <activemq.basedir>${project.basedir}/../..</activemq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-j</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-jms-client</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java new file mode 100644 index 0000000..6e58417 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolException.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import java.io.IOException; + +public class AmqpProtocolException extends IOException { + + private static final long serialVersionUID = -2869735532997332242L; + + private final String symbolicName; + private final boolean fatal; + + public AmqpProtocolException() { + this(null); + } + + public AmqpProtocolException(String s) { + this(s, false); + } + + public AmqpProtocolException(String s, boolean fatal) { + this(s, fatal, null); + } + + public AmqpProtocolException(String s, String msg) { + this(s, msg, false, null); + } + + public AmqpProtocolException(String s, boolean fatal, Throwable cause) { + this("error", s, fatal, cause); + } + + public AmqpProtocolException(String symbolicName, String s, boolean fatal, Throwable cause) { + super(s); + this.symbolicName = symbolicName; + this.fatal = fatal; + initCause(cause); + } + + public boolean isFatal() { + return fatal; + } + + public String getSymbolicName() { + return symbolicName; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java new file mode 100644 index 0000000..cde4def --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/AmqpSupport.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import java.nio.ByteBuffer; +import java.util.AbstractMap; +import java.util.Map; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.transaction.Coordinator; +import org.fusesource.hawtbuf.Buffer; + +/** + * Set of useful methods and definitions used in the AMQP protocol handling + */ +public class AmqpSupport { + + // Identification values used to locating JMS selector types. + public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L); + public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string"); + public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[]{JMS_SELECTOR_CODE, JMS_SELECTOR_NAME}; + public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L); + public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:no-local-filter:list"); + public static final Object[] NO_LOCAL_FILTER_IDS = new Object[]{NO_LOCAL_CODE, NO_LOCAL_NAME}; + + // Capabilities used to identify destination type in some requests. + public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); + public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); + + // Symbols used to announce connection information to remote peer. + public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field"); + public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id"); + + // Symbols used to announce connection information to remote peer. + public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); + public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); + public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); + public static final Symbol PRODUCT = Symbol.valueOf("product"); + public static final Symbol VERSION = Symbol.valueOf("version"); + public static final Symbol PLATFORM = Symbol.valueOf("platform"); + + // Symbols used in configuration of newly opened links. + public static final Symbol COPY = Symbol.getSymbol("copy"); + + // Lifetime policy symbols + public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + + /** + * Search for a given Symbol in a given array of Symbol object. + * + * @param symbols the set of Symbols to search. + * @param key the value to try and find in the Symbol array. + * @return true if the key is found in the given Symbol array. + */ + public static boolean contains(Symbol[] symbols, Symbol key) { + if (symbols == null || symbols.length == 0) { + return false; + } + + for (Symbol symbol : symbols) { + if (symbol.equals(key)) { + return true; + } + } + + return false; + } + + /** + * Search for a particular filter using a set of known indentification values + * in the Map of filters. + * + * @param filters The filters map that should be searched. + * @param filterIds The aliases for the target filter to be located. + * @return the filter if found in the mapping or null if not found. + */ + public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) { + + if (filterIds == null || filterIds.length == 0) { + throw new IllegalArgumentException("Invalid empty Filter Ids array passed: "); + } + + if (filters == null || filters.isEmpty()) { + return null; + } + + for (Map.Entry<Symbol, Object> filter : filters.entrySet()) { + if (filter.getValue() instanceof DescribedType) { + DescribedType describedType = ((DescribedType) filter.getValue()); + Object descriptor = describedType.getDescriptor(); + + for (Object filterId : filterIds) { + if (descriptor.equals(filterId)) { + return new AbstractMap.SimpleImmutableEntry<>(filter.getKey(), describedType); + } + } + } + } + + return null; + } + + /** + * Conversion from Java ByteBuffer to a HawtBuf buffer. + * + * @param data the ByteBuffer instance to convert. + * @return a new HawtBuf buffer converted from the given ByteBuffer. + */ + public static Buffer toBuffer(ByteBuffer data) { + if (data == null) { + return null; + } + + Buffer rc; + + if (data.isDirect()) { + rc = new Buffer(data.remaining()); + data.get(rc.data); + } + else { + rc = new Buffer(data); + data.position(data.position() + data.remaining()); + } + + return rc; + } + + /** + * Given a long value, convert it to a byte array for marshalling. + * + * @param value the value to convert. + * @return a new byte array that holds the big endian value of the long. + */ + public static byte[] toBytes(long value) { + Buffer buffer = new Buffer(8); + buffer.bigEndianEditor().writeLong(value); + return buffer.data; + } + + /** + * Converts a Binary value to a long assuming that the contained value is + * stored in Big Endian encoding. + * + * @param value the Binary object whose payload is converted to a long. + * @return a long value constructed from the bytes of the Binary instance. + */ + public static long toLong(Binary value) { + Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength()); + return buffer.bigEndianEditor().readLong(); + } + + /** + * Given an AMQP endpoint, deduce the appropriate ActiveMQDestination type and create + * a new instance. By default if the endpoint address does not carry the standard prefix + * value then we default to a Queue type destination. If the endpoint is null or is an + * AMQP Coordinator type endpoint this method returns null to indicate no destination + * can be mapped. + * + * @param endpoint the AMQP endpoint to construct an ActiveMQDestination from. + * @return a new ActiveMQDestination that best matches the address of the given endpoint + * @throws AmqpProtocolException if an error occurs while deducing the destination type. + */ + public static ActiveMQDestination createDestination(Object endpoint) throws AmqpProtocolException { + if (endpoint == null) { + return null; + } + else if (endpoint instanceof Coordinator) { + return null; + } + else if (endpoint instanceof org.apache.qpid.proton.amqp.messaging.Terminus) { + org.apache.qpid.proton.amqp.messaging.Terminus terminus = (org.apache.qpid.proton.amqp.messaging.Terminus) endpoint; + if (terminus.getAddress() == null || terminus.getAddress().length() == 0) { + if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) { + throw new AmqpProtocolException("amqp:invalid-field", "source address not set"); + } + else { + throw new AmqpProtocolException("amqp:invalid-field", "target address not set"); + } + } + + return ActiveMQDestination.createDestination(terminus.getAddress(), ActiveMQDestination.QUEUE_TYPE); + } + else { + throw new RuntimeException("Unexpected terminus type: " + endpoint); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java new file mode 100644 index 0000000..b99c56b --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.IOException; + +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base for all AmqpResource implementations to extend. + * + * This abstract class wraps up the basic state management bits so that the concrete + * object don't have to reproduce it. Provides hooks for the subclasses to initialize + * and shutdown. + */ +public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpResource { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class); + + protected AsyncResult openRequest; + protected AsyncResult closeRequest; + + private AmqpValidator amqpStateInspector; + + private E endpoint; + + @Override + public void open(AsyncResult request) { + this.openRequest = request; + doOpen(); + getEndpoint().setContext(this); + } + + @Override + public boolean isOpen() { + return getEndpoint().getRemoteState() == EndpointState.ACTIVE; + } + + @Override + public void opened() { + if (this.openRequest != null) { + this.openRequest.onSuccess(); + this.openRequest = null; + } + } + + @Override + public void detach(AsyncResult request) { + // If already closed signal success or else the caller might never get notified. + if (getEndpoint().getLocalState() == EndpointState.CLOSED || getEndpoint().getRemoteState() == EndpointState.CLOSED) { + + if (getEndpoint().getLocalState() != EndpointState.CLOSED) { + doDetach(); + getEndpoint().free(); + } + + request.onSuccess(); + } + else { + this.closeRequest = request; + doDetach(); + } + } + + @Override + public void close(AsyncResult request) { + // If already closed signal success or else the caller might never get notified. + if (getEndpoint().getLocalState() == EndpointState.CLOSED || getEndpoint().getRemoteState() == EndpointState.CLOSED) { + + if (getEndpoint().getLocalState() != EndpointState.CLOSED) { + doClose(); + getEndpoint().free(); + } + + request.onSuccess(); + } + else { + this.closeRequest = request; + doClose(); + } + } + + @Override + public boolean isClosed() { + return getEndpoint().getLocalState() == EndpointState.CLOSED; + } + + @Override + public void closed() { + getEndpoint().close(); + getEndpoint().free(); + + if (this.closeRequest != null) { + this.closeRequest.onSuccess(); + this.closeRequest = null; + } + } + + @Override + public void failed() { + failed(new Exception("Remote request failed.")); + } + + @Override + public void failed(Exception cause) { + if (openRequest != null) { + if (endpoint != null) { + // TODO: if this is a producer/consumer link then we may only be detached, + // rather than fully closed, and should respond appropriately. + endpoint.close(); + } + openRequest.onFailure(cause); + openRequest = null; + } + + if (closeRequest != null) { + closeRequest.onFailure(cause); + closeRequest = null; + } + } + + @Override + public void remotelyClosed(AmqpConnection connection) { + Exception error = AmqpSupport.convertToException(getEndpoint().getRemoteCondition()); + + if (endpoint != null) { + // TODO: if this is a producer/consumer link then we may only be detached, + // rather than fully closed, and should respond appropriately. + endpoint.close(); + } + + LOG.info("Resource {} was remotely closed", this); + + connection.fireClientException(error); + } + + @Override + public void locallyClosed(AmqpConnection connection, Exception error) { + if (endpoint != null) { + // TODO: if this is a producer/consumer link then we may only be detached, + // rather than fully closed, and should respond appropriately. + endpoint.close(); + } + + LOG.info("Resource {} was locally closed", this); + + connection.fireClientException(error); + } + + public E getEndpoint() { + return this.endpoint; + } + + public void setEndpoint(E endpoint) { + this.endpoint = endpoint; + } + + public AmqpValidator getStateInspector() { + return amqpStateInspector; + } + + public void setStateInspector(AmqpValidator stateInspector) { + if (stateInspector == null) { + stateInspector = new AmqpValidator(); + } + + this.amqpStateInspector = stateInspector; + } + + public EndpointState getLocalState() { + if (getEndpoint() == null) { + return EndpointState.UNINITIALIZED; + } + return getEndpoint().getLocalState(); + } + + public EndpointState getRemoteState() { + if (getEndpoint() == null) { + return EndpointState.UNINITIALIZED; + } + return getEndpoint().getRemoteState(); + } + + public boolean hasRemoteError() { + return getEndpoint().getRemoteCondition().getCondition() != null; + } + + @Override + public void processRemoteOpen(AmqpConnection connection) throws IOException { + doOpenInspection(); + doOpenCompletion(); + } + + @Override + public void processRemoteDetach(AmqpConnection connection) throws IOException { + doDetachedInspection(); + if (isAwaitingClose()) { + LOG.debug("{} is now closed: ", this); + closed(); + } + else { + remotelyClosed(connection); + } + } + + @Override + public void processRemoteClose(AmqpConnection connection) throws IOException { + doClosedInspection(); + if (isAwaitingClose()) { + LOG.debug("{} is now closed: ", this); + closed(); + } + else if (isAwaitingOpen()) { + // Error on Open, create exception and signal failure. + LOG.warn("Open of {} failed: ", this); + Exception openError; + if (hasRemoteError()) { + openError = AmqpSupport.convertToException(getEndpoint().getRemoteCondition()); + } + else { + openError = getOpenAbortException(); + } + + failed(openError); + } + else { + remotelyClosed(connection); + } + } + + @Override + public void processDeliveryUpdates(AmqpConnection connection) throws IOException { + } + + @Override + public void processFlowUpdates(AmqpConnection connection) throws IOException { + } + + /** + * Perform the open operation on the managed endpoint. A subclass may + * override this method to provide additional open actions or configuration + * updates. + */ + protected void doOpen() { + getEndpoint().open(); + } + + /** + * Perform the close operation on the managed endpoint. A subclass may + * override this method to provide additional close actions or alter the + * standard close path such as endpoint detach etc. + */ + protected void doClose() { + getEndpoint().close(); + } + + /** + * Perform the detach operation on the managed endpoint. + * + * By default this method throws an UnsupportedOperationException, a subclass + * must implement this and do a detach if its resource supports that. + */ + protected void doDetach() { + throw new UnsupportedOperationException("Endpoint cannot be detached."); + } + + /** + * Complete the open operation on the managed endpoint. A subclass may + * override this method to provide additional verification actions or configuration + * updates. + */ + protected void doOpenCompletion() { + LOG.debug("{} is now open: ", this); + opened(); + } + + /** + * When aborting the open operation, and there isnt an error condition, + * provided by the peer, the returned exception will be used instead. + * A subclass may override this method to provide alternative behaviour. + */ + protected Exception getOpenAbortException() { + return new IOException("Open failed unexpectedly."); + } + + // TODO - Fina a more generic way to do this. + protected abstract void doOpenInspection(); + + protected abstract void doClosedInspection(); + + protected void doDetachedInspection() { + } + + //----- Private implementation utility methods ---------------------------// + + private boolean isAwaitingOpen() { + return this.openRequest != null; + } + + private boolean isAwaitingClose() { + return this.closeRequest != null; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java new file mode 100644 index 0000000..001942e --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.transport.amqp.client.transport.NettyTransport; +import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; +import org.apache.qpid.proton.amqp.Symbol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Connection instance used to connect to the Broker using Proton as + * the AMQP protocol handler. + */ +public class AmqpClient { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpClient.class); + + private final String username; + private final String password; + private final URI remoteURI; + private String authzid; + private String mechanismRestriction; + + private AmqpValidator stateInspector = new AmqpValidator(); + private List<Symbol> offeredCapabilities = Collections.emptyList(); + private Map<Symbol, Object> offeredProperties = Collections.emptyMap(); + + /** + * Creates an AmqpClient instance which can be used as a factory for connections. + * + * @param remoteURI The address of the remote peer to connect to. + * @param username The user name to use when authenticating the client. + * @param password The password to use when authenticating the client. + */ + public AmqpClient(URI remoteURI, String username, String password) { + this.remoteURI = remoteURI; + this.password = password; + this.username = username; + } + + /** + * Creates a connection with the broker at the given location, this method initiates a + * connect attempt immediately and will fail if the remote peer cannot be reached. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + * @returns a new connection object used to interact with the connected peer. + */ + public AmqpConnection connect() throws Exception { + + AmqpConnection connection = createConnection(); + + LOG.debug("Attempting to create new connection to peer: {}", remoteURI); + connection.connect(); + + return connection; + } + + /** + * Creates a connection object using the configured values for user, password, remote URI + * etc. This method does not immediately initiate a connection to the remote leaving that + * to the caller which provides a connection object that can have additional configuration + * changes applied before the <code>connect</code> method is invoked. + * + * @throws Exception if an error occurs attempting to connect to the Broker. + * @returns a new connection object used to interact with the connected peer. + */ + public AmqpConnection createConnection() throws Exception { + if (username == null && password != null) { + throw new IllegalArgumentException("Password must be null if user name value is null"); + } + + NettyTransport transport = NettyTransportFactory.createTransport(remoteURI); + AmqpConnection connection = new AmqpConnection(transport, username, password); + + connection.setMechanismRestriction(mechanismRestriction); + connection.setAuthzid(authzid); + + connection.setOfferedCapabilities(getOfferedCapabilities()); + connection.setOfferedProperties(getOfferedProperties()); + connection.setStateInspector(getStateInspector()); + + return connection; + } + + /** + * @return the user name value given when constructed. + */ + public String getUsername() { + return username; + } + + /** + * @return the password value given when constructed. + */ + public String getPassword() { + return password; + } + + /** + * @param authzid The authzid used when authenticating (currently only with PLAIN) + */ + public void setAuthzid(String authzid) { + this.authzid = authzid; + } + + public String getAuthzid() { + return authzid; + } + + /** + * @param mechanismRestriction The mechanism to use when authenticating (if offered by the server) + */ + public void setMechanismRestriction(String mechanismRestriction) { + this.mechanismRestriction = mechanismRestriction; + } + + public String getMechanismRestriction() { + return mechanismRestriction; + } + + /** + * @return the currently set address to use to connect to the AMQP peer. + */ + public URI getRemoteURI() { + return remoteURI; + } + + /** + * Sets the offered capabilities that should be used when a new connection attempt + * is made. + * + * @param offeredCapabilities the list of capabilities to offer when connecting. + */ + public void setOfferedCapabilities(List<Symbol> offeredCapabilities) { + if (offeredCapabilities != null) { + offeredCapabilities = Collections.emptyList(); + } + + this.offeredCapabilities = offeredCapabilities; + } + + /** + * @return an unmodifiable view of the currently set offered capabilities + */ + public List<Symbol> getOfferedCapabilities() { + return Collections.unmodifiableList(offeredCapabilities); + } + + /** + * Sets the offered connection properties that should be used when a new connection + * attempt is made. + * + * @param offeredProperties the map of properties to offer when connecting. + */ + public void setOfferedProperties(Map<Symbol, Object> offeredProperties) { + if (offeredProperties != null) { + offeredProperties = Collections.emptyMap(); + } + + this.offeredProperties = offeredProperties; + } + + /** + * @return an unmodifiable view of the currently set connection properties. + */ + public Map<Symbol, Object> getOfferedProperties() { + return Collections.unmodifiableMap(offeredProperties); + } + + /** + * @return the currently set state inspector used to check state after various events. + */ + public AmqpValidator getStateInspector() { + return stateInspector; + } + + /** + * Sets the state inspector used to check that the AMQP resource is valid after + * specific lifecycle events such as open and close. + * + * @param stateInspector the new state inspector to use. + */ + public void setValidator(AmqpValidator stateInspector) { + if (stateInspector == null) { + stateInspector = new AmqpValidator(); + } + + this.stateInspector = stateInspector; + } + + @Override + public String toString() { + return "AmqpClient: " + getRemoteURI().getHost() + ":" + getRemoteURI().getPort(); + } + + /** + * Creates an anonymous connection with the broker at the given location. + * + * @param broker the address of the remote broker instance. + * @throws Exception if an error occurs attempting to connect to the Broker. + * @returns a new connection object used to interact with the connected peer. + */ + public static AmqpConnection connect(URI broker) throws Exception { + return connect(broker, null, null); + } + + /** + * Creates a connection with the broker at the given location. + * + * @param broker the address of the remote broker instance. + * @param username the user name to use to connect to the broker or null for anonymous. + * @param password the password to use to connect to the broker, must be null if user name is null. + * @throws Exception if an error occurs attempting to connect to the Broker. + * @returns a new connection object used to interact with the connected peer. + */ + public static AmqpConnection connect(URI broker, String username, String password) throws Exception { + if (username == null && password != null) { + throw new IllegalArgumentException("Password must be null if user name value is null"); + } + + AmqpClient client = new AmqpClient(broker, username, password); + + return client.connect(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java new file mode 100644 index 0000000..1454dd9 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -0,0 +1,720 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; +import org.apache.activemq.transport.InactivityIOException; +import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; +import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; +import org.apache.activemq.transport.amqp.client.util.AsyncResult; +import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.IdGenerator; +import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult; +import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Event.Type; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; + +public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); + + private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult(); + + private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1; + // NOTE: Limit default channel max to signed short range to deal with + // brokers that don't currently handle the unsigned range well. + private static final int DEFAULT_CHANNEL_MAX = 32767; + private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); + + public static final long DEFAULT_CONNECT_TIMEOUT = 515000; + public static final long DEFAULT_CLOSE_TIMEOUT = 30000; + public static final long DEFAULT_DRAIN_TIMEOUT = 60000; + + private final ScheduledExecutorService serializer; + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicLong sessionIdGenerator = new AtomicLong(); + private final AtomicLong txIdGenerator = new AtomicLong(); + private final Collector protonCollector = new CollectorImpl(); + private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport; + private final Transport protonTransport = Transport.Factory.create(); + + private final String username; + private final String password; + private final URI remoteURI; + private final String connectionId; + private List<Symbol> offeredCapabilities = Collections.emptyList(); + private Map<Symbol, Object> offeredProperties = Collections.emptyMap(); + + private AmqpConnectionListener listener; + private SaslAuthenticator authenticator; + private String mechanismRestriction; + private String authzid; + + private int idleTimeout = 0; + private boolean idleProcessingDisabled; + private String containerId; + private boolean authenticated; + private int channelMax = DEFAULT_CHANNEL_MAX; + private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; + private long drainTimeout = DEFAULT_DRAIN_TIMEOUT; + + public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, + String username, + String password) { + setEndpoint(Connection.Factory.create()); + getEndpoint().collect(protonCollector); + + this.transport = transport; + this.username = username; + this.password = password; + this.connectionId = CONNECTION_ID_GENERATOR.generateId(); + this.remoteURI = transport.getRemoteLocation(); + + this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread serial = new Thread(runner); + serial.setDaemon(true); + serial.setName(toString()); + return serial; + } + }); + + this.transport.setTransportListener(this); + } + + public void connect() throws Exception { + if (connected.compareAndSet(false, true)) { + transport.connect(); + + final ClientFuture future = new ClientFuture(); + serializer.execute(new Runnable() { + @Override + public void run() { + getEndpoint().setContainer(safeGetContainerId()); + getEndpoint().setHostname(remoteURI.getHost()); + if (!getOfferedCapabilities().isEmpty()) { + getEndpoint().setOfferedCapabilities(getOfferedCapabilities().toArray(new Symbol[0])); + } + if (!getOfferedProperties().isEmpty()) { + getEndpoint().setProperties(getOfferedProperties()); + } + + if (getIdleTimeout() > 0) { + protonTransport.setIdleTimeout(getIdleTimeout()); + } + protonTransport.setMaxFrameSize(getMaxFrameSize()); + protonTransport.setChannelMax(getChannelMax()); + protonTransport.bind(getEndpoint()); + Sasl sasl = protonTransport.sasl(); + if (sasl != null) { + sasl.client(); + } + authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction); + open(future); + + pumpToProtonTransport(future); + } + }); + + if (connectTimeout <= 0) { + future.sync(); + } + else { + future.sync(connectTimeout, TimeUnit.MILLISECONDS); + if (getEndpoint().getRemoteState() != EndpointState.ACTIVE) { + throw new IOException("Failed to connect after configured timeout."); + } + } + } + } + + public boolean isConnected() { + return transport.isConnected() && connected.get(); + } + + public void close() { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + + // If we are not connected then there is nothing we can do now + // just signal success. + if (!transport.isConnected()) { + request.onSuccess(); + } + + if (getEndpoint() != null) { + close(request); + } + else { + request.onSuccess(); + } + + pumpToProtonTransport(request); + } + catch (Exception e) { + LOG.debug("Caught exception while closing proton connection"); + } + } + }); + + try { + if (closeTimeout <= 0) { + request.sync(); + } + else { + request.sync(closeTimeout, TimeUnit.MILLISECONDS); + } + } + catch (IOException e) { + LOG.warn("Error caught while closing Provider: ", e.getMessage()); + } + finally { + if (transport != null) { + try { + transport.close(); + } + catch (Exception e) { + LOG.debug("Cuaght exception while closing down Transport: {}", e.getMessage()); + } + } + + serializer.shutdown(); + } + } + } + + /** + * Creates a new Session instance used to create AMQP resources like + * senders and receivers. + * + * @return a new AmqpSession that can be used to create links. + * @throws Exception if an error occurs during creation. + */ + public AmqpSession createSession() throws Exception { + checkClosed(); + + final AmqpSession session = new AmqpSession(AmqpConnection.this, getNextSessionId()); + final ClientFuture request = new ClientFuture(); + + serializer.execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + session.setEndpoint(getEndpoint().session()); + session.setStateInspector(getStateInspector()); + session.open(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + + return session; + } + + //----- Access to low level IO for specific test cases -------------------// + + public void sendRawBytes(final byte[] rawData) throws Exception { + checkClosed(); + + final ClientFuture request = new ClientFuture(); + + serializer.execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + try { + transport.send(Unpooled.wrappedBuffer(rawData)); + } + catch (IOException e) { + fireClientException(e); + } + finally { + request.onSuccess(); + } + } + }); + + request.sync(); + } + + //----- Configuration accessors ------------------------------------------// + + /** + * @return the user name that was used to authenticate this connection. + */ + public String getUsername() { + return username; + } + + /** + * @return the password that was used to authenticate this connection. + */ + public String getPassword() { + return password; + } + + public void setAuthzid(String authzid) { + this.authzid = authzid; + } + + public String getAuthzid() { + return authzid; + } + + /** + * @return the URI of the remote peer this connection attached to. + */ + public URI getRemoteURI() { + return remoteURI; + } + + /** + * @return the container ID that will be set as the container Id. + */ + public String getContainerId() { + return this.containerId; + } + + /** + * Sets the container Id that will be configured on the connection prior to + * connecting to the remote peer. Calling this after connect has no effect. + * + * @param containerId the container Id to use on the connection. + */ + public void setContainerId(String containerId) { + this.containerId = containerId; + } + + /** + * @return the currently set Max Frame Size value. + */ + public int getMaxFrameSize() { + return DEFAULT_MAX_FRAME_SIZE; + } + + public int getChannelMax() { + return channelMax; + } + + public void setChannelMax(int channelMax) { + this.channelMax = channelMax; + } + + public long getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(long connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public long getCloseTimeout() { + return closeTimeout; + } + + public void setCloseTimeout(long closeTimeout) { + this.closeTimeout = closeTimeout; + } + + public long getDrainTimeout() { + return drainTimeout; + } + + public void setDrainTimeout(long drainTimeout) { + this.drainTimeout = drainTimeout; + } + + public List<Symbol> getOfferedCapabilities() { + return offeredCapabilities; + } + + public void setOfferedCapabilities(List<Symbol> offeredCapabilities) { + if (offeredCapabilities != null) { + offeredCapabilities = Collections.emptyList(); + } + + this.offeredCapabilities = offeredCapabilities; + } + + public Map<Symbol, Object> getOfferedProperties() { + return offeredProperties; + } + + public void setOfferedProperties(Map<Symbol, Object> offeredProperties) { + if (offeredProperties != null) { + offeredProperties = Collections.emptyMap(); + } + + this.offeredProperties = offeredProperties; + } + + public Connection getConnection() { + return new UnmodifiableConnection(getEndpoint()); + } + + public AmqpConnectionListener getListener() { + return listener; + } + + public void setListener(AmqpConnectionListener listener) { + this.listener = listener; + } + + public int getIdleTimeout() { + return idleTimeout; + } + + public void setIdleTimeout(int idleTimeout) { + this.idleTimeout = idleTimeout; + } + + public void setIdleProcessingDisabled(boolean value) { + this.idleProcessingDisabled = value; + } + + public boolean isIdleProcessingDisabled() { + return idleProcessingDisabled; + } + + /** + * Sets a restriction on the SASL mechanism to use (if offered by the server). + * + * @param mechanismRestriction the mechanism to use + */ + public void setMechanismRestriction(String mechanismRestriction) { + this.mechanismRestriction = mechanismRestriction; + } + + public String getMechanismRestriction() { + return mechanismRestriction; + } + + //----- Internal getters used from the child AmqpResource classes --------// + + ScheduledExecutorService getScheduler() { + return this.serializer; + } + + Connection getProtonConnection() { + return getEndpoint(); + } + + String getConnectionId() { + return this.connectionId; + } + + AmqpTransactionId getNextTransactionId() { + return new AmqpTransactionId(connectionId + ":" + txIdGenerator.incrementAndGet()); + } + + void pumpToProtonTransport() { + pumpToProtonTransport(NOOP_REQUEST); + } + + void pumpToProtonTransport(AsyncResult request) { + try { + boolean done = false; + while (!done) { + ByteBuffer toWrite = protonTransport.getOutputBuffer(); + if (toWrite != null && toWrite.hasRemaining()) { + ByteBuf outbound = transport.allocateSendBuffer(toWrite.remaining()); + outbound.writeBytes(toWrite); + transport.send(outbound); + protonTransport.outputConsumed(); + } + else { + done = true; + } + } + } + catch (IOException e) { + fireClientException(e); + request.onFailure(e); + } + } + + //----- Transport listener event hooks -----------------------------------// + + @Override + public void onData(final ByteBuf incoming) { + + // We need to retain until the serializer gets around to processing it. + ReferenceCountUtil.retain(incoming); + + serializer.execute(new Runnable() { + + @Override + public void run() { + ByteBuffer source = incoming.nioBuffer(); + LOG.trace("Client Received from Broker {} bytes:", source.remaining()); + + if (protonTransport.isClosed()) { + LOG.debug("Ignoring incoming data because transport is closed"); + return; + } + + do { + ByteBuffer buffer = protonTransport.getInputBuffer(); + int limit = Math.min(buffer.remaining(), source.remaining()); + ByteBuffer duplicate = source.duplicate(); + duplicate.limit(source.position() + limit); + buffer.put(duplicate); + protonTransport.processInput(); + source.position(source.position() + limit); + } while (source.hasRemaining()); + + ReferenceCountUtil.release(incoming); + + // Process the state changes from the latest data and then answer back + // any pending updates to the Broker. + processUpdates(); + pumpToProtonTransport(); + } + }); + } + + @Override + public void onTransportClosed() { + LOG.debug("The transport has unexpectedly closed"); + failed(getOpenAbortException()); + } + + @Override + public void onTransportError(Throwable cause) { + fireClientException(cause); + } + + //----- Internal implementation ------------------------------------------// + + @Override + protected void doOpenCompletion() { + // If the remote indicates that a close is pending, don't open. + if (getEndpoint().getRemoteProperties() == null || !getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { + + if (!isIdleProcessingDisabled()) { + // Using nano time since it is not related to the wall clock, which may change + long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + long initialKeepAliveDeadline = protonTransport.tick(initialNow); + if (initialKeepAliveDeadline > 0) { + + getScheduler().schedule(new Runnable() { + + @Override + public void run() { + try { + if (getEndpoint().getLocalState() != EndpointState.CLOSED) { + LOG.debug("Client performing next idle check"); + // Using nano time since it is not related to the wall clock, which may change + long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + long rescheduleAt = protonTransport.tick(now) - now; + pumpToProtonTransport(); + if (protonTransport.isClosed()) { + LOG.debug("Transport closed after inactivity check."); + throw new InactivityIOException("Channel was inactive for to long"); + } + + if (rescheduleAt > 0) { + getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); + } + } + } + catch (Exception e) { + try { + transport.close(); + } + catch (IOException e1) { + } + fireClientException(e); + } + } + }, initialKeepAliveDeadline - initialNow, TimeUnit.MILLISECONDS); + } + } + super.doOpenCompletion(); + } + } + + @Override + protected void doOpenInspection() { + try { + getStateInspector().inspectOpenedResource(getConnection()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + @Override + protected void doClosedInspection() { + try { + getStateInspector().inspectClosedResource(getConnection()); + } + catch (Throwable error) { + getStateInspector().markAsInvalid(error.getMessage()); + } + } + + protected void fireClientException(Throwable ex) { + AmqpConnectionListener listener = this.listener; + if (listener != null) { + listener.onException(ex); + } + } + + protected void checkClosed() throws IllegalStateException { + if (closed.get()) { + throw new IllegalStateException("The Connection is already closed"); + } + } + + private void processUpdates() { + try { + Event protonEvent = null; + while ((protonEvent = protonCollector.peek()) != null) { + if (!protonEvent.getType().equals(Type.TRANSPORT)) { + LOG.trace("Client: New Proton Event: {}", protonEvent.getType()); + } + + AmqpEventSink amqpEventSink = null; + switch (protonEvent.getType()) { + case CONNECTION_REMOTE_CLOSE: + amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext(); + amqpEventSink.processRemoteClose(this); + break; + case CONNECTION_REMOTE_OPEN: + amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext(); + amqpEventSink.processRemoteOpen(this); + break; + case SESSION_REMOTE_CLOSE: + amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext(); + amqpEventSink.processRemoteClose(this); + break; + case SESSION_REMOTE_OPEN: + amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext(); + amqpEventSink.processRemoteOpen(this); + break; + case LINK_REMOTE_CLOSE: + amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); + amqpEventSink.processRemoteClose(this); + break; + case LINK_REMOTE_DETACH: + amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); + amqpEventSink.processRemoteDetach(this); + break; + case LINK_REMOTE_OPEN: + amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); + amqpEventSink.processRemoteOpen(this); + break; + case LINK_FLOW: + amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); + amqpEventSink.processFlowUpdates(this); + break; + case DELIVERY: + amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext(); + amqpEventSink.processDeliveryUpdates(this); + break; + default: + break; + } + + protonCollector.pop(); + } + + // We have to do this to pump SASL bytes in as SASL is not event driven yet. + if (!authenticated) { + processSaslAuthentication(); + } + } + catch (Exception ex) { + LOG.warn("Caught Exception during update processing: {}", ex.getMessage(), ex); + fireClientException(ex); + } + } + + private void processSaslAuthentication() { + if (authenticated || authenticator == null) { + return; + } + + try { + if (authenticator.authenticate()) { + authenticator = null; + authenticated = true; + } + } + catch (SecurityException ex) { + failed(ex); + } + } + + private String getNextSessionId() { + return connectionId + ":" + sessionIdGenerator.incrementAndGet(); + } + + private String safeGetContainerId() { + String containerId = getContainerId(); + if (containerId == null || containerId.isEmpty()) { + containerId = UUID.randomUUID().toString(); + } + + return containerId; + } + + @Override + public String toString() { + return "AmqpConnection { " + connectionId + " }"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java new file mode 100644 index 0000000..822170a --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +/** + * Events points exposed by the AmqpClient object. + */ +public interface AmqpConnectionListener { + + /** + * Indicates some error has occurred during client operations. + * + * @param ex The error that triggered this event. + */ + void onException(Throwable ex); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java new file mode 100644 index 0000000..d2492e9 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +/** + * Default listener implementation that stubs out all the event methods. + */ +public class AmqpDefaultConnectionListener implements AmqpConnectionListener { + + @Override + public void onException(Throwable ex) { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java new file mode 100644 index 0000000..1c511a5 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpEventSink.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import java.io.IOException; + +/** + * Interface used by classes that want to process AMQP events sent from + * the transport layer. + */ +public interface AmqpEventSink { + + /** + * Event handler for remote peer open of this resource. + * + * @param connection the AmqpConnection instance for easier access to fire events. + * @throws IOException if an error occurs while processing the update. + */ + void processRemoteOpen(AmqpConnection connection) throws IOException; + + /** + * Event handler for remote peer detach of this resource. + * + * @param connection the AmqpConnection instance for easier access to fire events. + * @throws IOException if an error occurs while processing the update. + */ + void processRemoteDetach(AmqpConnection connection) throws IOException; + + /** + * Event handler for remote peer close of this resource. + * + * @param connection the AmqpConnection instance for easier access to fire events. + * @throws IOException if an error occurs while processing the update. + */ + void processRemoteClose(AmqpConnection connection) throws IOException; + + /** + * Called when the Proton Engine signals an Delivery related event has been triggered + * for the given endpoint. + * + * @param connection the AmqpConnection instance for easier access to fire events. + * @throws IOException if an error occurs while processing the update. + */ + void processDeliveryUpdates(AmqpConnection connection) throws IOException; + + /** + * Called when the Proton Engine signals an Flow related event has been triggered + * for the given endpoint. + * + * @param connection the AmqpConnection instance for easier access to fire events. + * @throws IOException if an error occurs while processing the update. + */ + void processFlowUpdates(AmqpConnection connection) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java new file mode 100644 index 0000000..adf5df6 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpJmsSelectorFilter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.DescribedType; + +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_CODE; + +/** + * A Described Type wrapper for JMS selector values. + */ +public class AmqpJmsSelectorFilter implements DescribedType { + + private final String selector; + + public AmqpJmsSelectorFilter(String selector) { + this.selector = selector; + } + + @Override + public Object getDescriptor() { + return JMS_SELECTOR_CODE; + } + + @Override + public Object getDescribed() { + return this.selector; + } + + @Override + public String toString() { + return "AmqpJmsSelectorType{" + selector + "}"; + } +}
