http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java new file mode 100644 index 0000000..f790433 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java @@ -0,0 +1,402 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.io.IOException; +import java.net.URI; +import java.security.Principal; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TCP based transport that uses Netty as the underlying IO layer. + */ +public class NettyTcpTransport implements NettyTransport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); + + private static final int QUIET_PERIOD = 20; + private static final int SHUTDOWN_TIMEOUT = 100; + + protected Bootstrap bootstrap; + protected EventLoopGroup group; + protected Channel channel; + protected NettyTransportListener listener; + protected NettyTransportOptions options; + protected final URI remote; + protected boolean secure; + + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final CountDownLatch connectLatch = new CountDownLatch(1); + private IOException failureCause; + private Throwable pendingFailure; + + /** + * Create a new transport instance + * + * @param remoteLocation the URI that defines the remote resource to connect to. + * @param options the transport options used to configure the socket connection. + */ + public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener the TransportListener that will receive events from this Transport. + * @param remoteLocation the URI that defines the remote resource to connect to. + * @param options the transport options used to configure the socket connection. + */ + public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + this.options = options; + this.listener = listener; + this.remote = remoteLocation; + this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl"); + } + + @Override + public void connect() throws IOException { + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + group = new NioEventLoopGroup(1); + + bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer<Channel>() { + + @Override + public void initChannel(Channel connectedChannel) throws Exception { + configureChannel(connectedChannel); + } + }); + + configureNetty(bootstrap, getTransportOptions()); + + ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort()); + future.addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + handleConnected(future.channel()); + } + else if (future.isCancelled()) { + connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); + } + else { + connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); + } + } + }); + + try { + connectLatch.await(); + } + catch (InterruptedException ex) { + LOG.debug("Transport connection was interrupted."); + Thread.interrupted(); + failureCause = IOExceptionSupport.create(ex); + } + + if (failureCause != null) { + // Close out any Netty resources now as they are no longer needed. + if (channel != null) { + channel.close().syncUninterruptibly(); + channel = null; + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + group = null; + } + + throw failureCause; + } + else { + // Connected, allow any held async error to fire now and close the transport. + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + if (pendingFailure != null) { + channel.pipeline().fireExceptionCaught(pendingFailure); + } + } + }); + } + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public boolean isSSL() { + return secure; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + connected.set(false); + if (channel != null) { + channel.close().syncUninterruptibly(); + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } + } + } + + @Override + public ByteBuf allocateSendBuffer(int size) throws IOException { + checkConnected(); + return channel.alloc().ioBuffer(size, size); + } + + @Override + public void send(ByteBuf output) throws IOException { + checkConnected(); + int length = output.readableBytes(); + if (length == 0) { + return; + } + + LOG.trace("Attempted write of: {} bytes", length); + + channel.writeAndFlush(output); + } + + @Override + public NettyTransportListener getTransportListener() { + return listener; + } + + @Override + public void setTransportListener(NettyTransportListener listener) { + this.listener = listener; + } + + @Override + public NettyTransportOptions getTransportOptions() { + if (options == null) { + if (isSSL()) { + options = NettyTransportSslOptions.INSTANCE; + } + else { + options = NettyTransportOptions.INSTANCE; + } + } + + return options; + } + + @Override + public URI getRemoteLocation() { + return remote; + } + + @Override + public Principal getLocalPrincipal() { + if (!isSSL()) { + throw new UnsupportedOperationException("Not connected to a secure channel"); + } + + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + + return sslHandler.engine().getSession().getLocalPrincipal(); + } + + //----- Internal implementation details, can be overridden as needed --// + + protected String getRemoteHost() { + return remote.getHost(); + } + + protected int getRemotePort() { + int port = remote.getPort(); + + if (port <= 0) { + if (isSSL()) { + port = getSslOptions().getDefaultSslPort(); + } + else { + port = getTransportOptions().getDefaultTcpPort(); + } + } + + return port; + } + + protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); + bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + + if (options.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); + } + + if (options.getReceiveBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); + } + + if (options.getTrafficClass() != -1) { + bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); + } + } + + protected void configureChannel(final Channel channel) throws Exception { + if (isSSL()) { + SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { + @Override + public void operationComplete(Future<Channel> future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + connectionEstablished(channel); + } + else { + LOG.trace("SSL Handshake has failed: {}", channel); + connectionFailed(channel, IOExceptionSupport.create(future.cause())); + } + } + }); + + channel.pipeline().addLast(sslHandler); + } + + channel.pipeline().addLast(new NettyTcpTransportHandler()); + } + + protected void handleConnected(final Channel channel) throws Exception { + if (!isSSL()) { + connectionEstablished(channel); + } + } + + //----- State change handlers and checks ---------------------------------// + + /** + * Called when the transport has successfully connected and is ready for use. + */ + protected void connectionEstablished(Channel connectedChannel) { + channel = connectedChannel; + connected.set(true); + connectLatch.countDown(); + } + + /** + * Called when the transport connection failed and an error should be returned. + * + * @param failedChannel The Channel instance that failed. + * @param cause An IOException that describes the cause of the failed connection. + */ + protected void connectionFailed(Channel failedChannel, IOException cause) { + failureCause = IOExceptionSupport.create(cause); + channel = failedChannel; + connected.set(false); + connectLatch.countDown(); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + //----- Handle connection events -----------------------------------------// + + private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> { + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has become active! Channel is {}", context.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {}", context.channel()); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (pendingFailure != null) { + listener.onTransportError(pendingFailure); + } + else { + listener.onTransportError(cause); + } + } + else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (pendingFailure != null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + pendingFailure = cause; + } + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer); + listener.onData(buffer); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java new file mode 100644 index 0000000..a2bacdc --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.io.IOException; +import java.net.URI; +import java.security.Principal; + +import io.netty.buffer.ByteBuf; + +/** + * + */ +public interface NettyTransport { + + void connect() throws IOException; + + boolean isConnected(); + + boolean isSSL(); + + void close() throws IOException; + + ByteBuf allocateSendBuffer(int size) throws IOException; + + void send(ByteBuf output) throws IOException; + + NettyTransportListener getTransportListener(); + + void setTransportListener(NettyTransportListener listener); + + NettyTransportOptions getTransportOptions(); + + URI getRemoteLocation(); + + Principal getLocalPrincipal(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java new file mode 100644 index 0000000..5663713 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.net.URI; +import java.util.Map; + +import org.apache.activemq.transport.amqp.client.util.PropertyUtil; + +/** + * Factory for creating the Netty based TCP Transport. + */ +public final class NettyTransportFactory { + + private NettyTransportFactory() { + } + + /** + * Creates an instance of the given Transport and configures it using the + * properties set on the given remote broker URI. + * + * @param remoteURI The URI used to connect to a remote Peer. + * @return a new Transport instance. + * @throws Exception if an error occurs while creating the Transport instance. + */ + public static NettyTransport createTransport(URI remoteURI) throws Exception { + Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery()); + Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport."); + NettyTransportOptions transportOptions = null; + + remoteURI = PropertyUtil.replaceQuery(remoteURI, map); + + if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) { + transportOptions = NettyTransportOptions.INSTANCE.clone(); + } + else { + transportOptions = NettyTransportSslOptions.INSTANCE.clone(); + } + + Map<String, String> unused = PropertyUtil.setProperties(transportOptions, transportURIOptions); + if (!unused.isEmpty()) { + String msg = " Not all transport options could be set on the TCP based" + + " Transport. Check the options are spelled correctly." + + " Unused parameters=[" + unused + "]." + + " This provider instance cannot be started."; + throw new IllegalArgumentException(msg); + } + + NettyTransport result = null; + + switch (remoteURI.getScheme().toLowerCase()) { + case "tcp": + case "ssl": + result = new NettyTcpTransport(remoteURI, transportOptions); + break; + case "ws": + case "wss": + result = new NettyWSTransport(remoteURI, transportOptions); + break; + default: + throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme()); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java new file mode 100644 index 0000000..c23ca8c --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import io.netty.buffer.ByteBuf; + +/** + * Listener interface that should be implemented by users of the various + * QpidJMS Transport classes. + */ +public interface NettyTransportListener { + + /** + * Called when new incoming data has become available. + * + * @param incoming the next incoming packet of data. + */ + void onData(ByteBuf incoming); + + /** + * Called if the connection state becomes closed. + */ + void onTransportClosed(); + + /** + * Called when an error occurs during normal Transport operations. + * + * @param cause the error that triggered this event. + */ + void onTransportError(Throwable cause); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java new file mode 100644 index 0000000..3ffb8c8 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +/** + * Encapsulates all the TCP Transport options in one configuration object. + */ +public class NettyTransportOptions implements Cloneable { + + public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024; + public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE; + public static final int DEFAULT_TRAFFIC_CLASS = 0; + public static final boolean DEFAULT_TCP_NO_DELAY = true; + public static final boolean DEFAULT_TCP_KEEP_ALIVE = false; + public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE; + public static final int DEFAULT_SO_TIMEOUT = -1; + public static final int DEFAULT_CONNECT_TIMEOUT = 60000; + public static final int DEFAULT_TCP_PORT = 5672; + + public static final NettyTransportOptions INSTANCE = new NettyTransportOptions(); + + private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; + private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; + private int trafficClass = DEFAULT_TRAFFIC_CLASS; + private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int soTimeout = DEFAULT_SO_TIMEOUT; + private int soLinger = DEFAULT_SO_LINGER; + private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE; + private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; + private int defaultTcpPort = DEFAULT_TCP_PORT; + + /** + * @return the currently set send buffer size in bytes. + */ + public int getSendBufferSize() { + return sendBufferSize; + } + + /** + * Sets the send buffer size in bytes, the value must be greater than zero + * or an {@link IllegalArgumentException} will be thrown. + * + * @param sendBufferSize the new send buffer size for the TCP Transport. + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setSendBufferSize(int sendBufferSize) { + if (sendBufferSize <= 0) { + throw new IllegalArgumentException("The send buffer size must be > 0"); + } + + this.sendBufferSize = sendBufferSize; + } + + /** + * @return the currently configured receive buffer size in bytes. + */ + public int getReceiveBufferSize() { + return receiveBufferSize; + } + + /** + * Sets the receive buffer size in bytes, the value must be greater than zero + * or an {@link IllegalArgumentException} will be thrown. + * + * @param receiveBufferSize the new receive buffer size for the TCP Transport. + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setReceiveBufferSize(int receiveBufferSize) { + if (receiveBufferSize <= 0) { + throw new IllegalArgumentException("The send buffer size must be > 0"); + } + + this.receiveBufferSize = receiveBufferSize; + } + + /** + * @return the currently configured traffic class value. + */ + public int getTrafficClass() { + return trafficClass; + } + + /** + * Sets the traffic class value used by the TCP connection, valid + * range is between 0 and 255. + * + * @param trafficClass the new traffic class value. + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setTrafficClass(int trafficClass) { + if (trafficClass < 0 || trafficClass > 255) { + throw new IllegalArgumentException("Traffic class must be in the range [0..255]"); + } + + this.trafficClass = trafficClass; + } + + public int getSoTimeout() { + return soTimeout; + } + + public void setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public int getSoLinger() { + return soLinger; + } + + public void setSoLinger(int soLinger) { + this.soLinger = soLinger; + } + + public boolean isTcpKeepAlive() { + return tcpKeepAlive; + } + + public void setTcpKeepAlive(boolean keepAlive) { + this.tcpKeepAlive = keepAlive; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public int getDefaultTcpPort() { + return defaultTcpPort; + } + + public void setDefaultTcpPort(int defaultTcpPort) { + this.defaultTcpPort = defaultTcpPort; + } + + @Override + public NettyTransportOptions clone() { + return copyOptions(new NettyTransportOptions()); + } + + protected NettyTransportOptions copyOptions(NettyTransportOptions copy) { + copy.setConnectTimeout(getConnectTimeout()); + copy.setReceiveBufferSize(getReceiveBufferSize()); + copy.setSendBufferSize(getSendBufferSize()); + copy.setSoLinger(getSoLinger()); + copy.setSoTimeout(getSoTimeout()); + copy.setTcpKeepAlive(isTcpKeepAlive()); + copy.setTcpNoDelay(isTcpNoDelay()); + copy.setTrafficClass(getTrafficClass()); + + return copy; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java new file mode 100644 index 0000000..e256fbb --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Holds the defined SSL options for connections that operate over a secure + * transport. Options are read from the environment and can be overridden by + * specifying them on the connection URI. + */ +public class NettyTransportSslOptions extends NettyTransportOptions { + + public static final String DEFAULT_STORE_TYPE = "jks"; + public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS"; + public static final boolean DEFAULT_TRUST_ALL = false; + public static final boolean DEFAULT_VERIFY_HOST = false; + public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"})); + public static final int DEFAULT_SSL_PORT = 5671; + + public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions(); + + private String keyStoreLocation; + private String keyStorePassword; + private String trustStoreLocation; + private String trustStorePassword; + private String storeType = DEFAULT_STORE_TYPE; + private String[] enabledCipherSuites; + private String[] disabledCipherSuites; + private String[] enabledProtocols; + private String[] disabledProtocols = DEFAULT_DISABLED_PROTOCOLS.toArray(new String[0]); + private String contextProtocol = DEFAULT_CONTEXT_PROTOCOL; + + private boolean trustAll = DEFAULT_TRUST_ALL; + private boolean verifyHost = DEFAULT_VERIFY_HOST; + private String keyAlias; + private int defaultSslPort = DEFAULT_SSL_PORT; + + static { + INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore")); + INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword")); + INSTANCE.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore")); + INSTANCE.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword")); + } + + /** + * @return the keyStoreLocation currently configured. + */ + public String getKeyStoreLocation() { + return keyStoreLocation; + } + + /** + * Sets the location on disk of the key store to use. + * + * @param keyStoreLocation the keyStoreLocation to use to create the key manager. + */ + public void setKeyStoreLocation(String keyStoreLocation) { + this.keyStoreLocation = keyStoreLocation; + } + + /** + * @return the keyStorePassword + */ + public String getKeyStorePassword() { + return keyStorePassword; + } + + /** + * @param keyStorePassword the keyStorePassword to set + */ + public void setKeyStorePassword(String keyStorePassword) { + this.keyStorePassword = keyStorePassword; + } + + /** + * @return the trustStoreLocation + */ + public String getTrustStoreLocation() { + return trustStoreLocation; + } + + /** + * @param trustStoreLocation the trustStoreLocation to set + */ + public void setTrustStoreLocation(String trustStoreLocation) { + this.trustStoreLocation = trustStoreLocation; + } + + /** + * @return the trustStorePassword + */ + public String getTrustStorePassword() { + return trustStorePassword; + } + + /** + * @param trustStorePassword the trustStorePassword to set + */ + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } + + /** + * @return the storeType + */ + public String getStoreType() { + return storeType; + } + + /** + * @param storeType the format that the store files are encoded in. + */ + public void setStoreType(String storeType) { + this.storeType = storeType; + } + + /** + * @return the enabledCipherSuites + */ + public String[] getEnabledCipherSuites() { + return enabledCipherSuites; + } + + /** + * @param enabledCipherSuites the enabledCipherSuites to set + */ + public void setEnabledCipherSuites(String[] enabledCipherSuites) { + this.enabledCipherSuites = enabledCipherSuites; + } + + /** + * @return the disabledCipherSuites + */ + public String[] getDisabledCipherSuites() { + return disabledCipherSuites; + } + + /** + * @param disabledCipherSuites the disabledCipherSuites to set + */ + public void setDisabledCipherSuites(String[] disabledCipherSuites) { + this.disabledCipherSuites = disabledCipherSuites; + } + + /** + * @return the enabledProtocols or null if the defaults should be used + */ + public String[] getEnabledProtocols() { + return enabledProtocols; + } + + /** + * The protocols to be set as enabled. + * + * @param enabledProtocols the enabled protocols to set, or null if the defaults should be used. + */ + public void setEnabledProtocols(String[] enabledProtocols) { + this.enabledProtocols = enabledProtocols; + } + + /** + * @return the protocols to disable or null if none should be + */ + public String[] getDisabledProtocols() { + return disabledProtocols; + } + + /** + * The protocols to be disable. + * + * @param disabledProtocols the protocols to disable, or null if none should be. + */ + public void setDisabledProtocols(String[] disabledProtocols) { + this.disabledProtocols = disabledProtocols; + } + + /** + * @return the context protocol to use + */ + public String getContextProtocol() { + return contextProtocol; + } + + /** + * The protocol value to use when creating an SSLContext via + * SSLContext.getInstance(protocol). + * + * @param contextProtocol the context protocol to use. + */ + public void setContextProtocol(String contextProtocol) { + this.contextProtocol = contextProtocol; + } + + /** + * @return the trustAll + */ + public boolean isTrustAll() { + return trustAll; + } + + /** + * @param trustAll the trustAll to set + */ + public void setTrustAll(boolean trustAll) { + this.trustAll = trustAll; + } + + /** + * @return the verifyHost + */ + public boolean isVerifyHost() { + return verifyHost; + } + + /** + * @param verifyHost the verifyHost to set + */ + public void setVerifyHost(boolean verifyHost) { + this.verifyHost = verifyHost; + } + + /** + * @return the key alias + */ + public String getKeyAlias() { + return keyAlias; + } + + /** + * @param keyAlias the key alias to use + */ + public void setKeyAlias(String keyAlias) { + this.keyAlias = keyAlias; + } + + public int getDefaultSslPort() { + return defaultSslPort; + } + + public void setDefaultSslPort(int defaultSslPort) { + this.defaultSslPort = defaultSslPort; + } + + @Override + public NettyTransportSslOptions clone() { + return copyOptions(new NettyTransportSslOptions()); + } + + protected NettyTransportSslOptions copyOptions(NettyTransportSslOptions copy) { + super.copyOptions(copy); + + copy.setKeyStoreLocation(getKeyStoreLocation()); + copy.setKeyStorePassword(getKeyStorePassword()); + copy.setTrustStoreLocation(getTrustStoreLocation()); + copy.setTrustStorePassword(getTrustStorePassword()); + copy.setStoreType(getStoreType()); + copy.setEnabledCipherSuites(getEnabledCipherSuites()); + copy.setDisabledCipherSuites(getDisabledCipherSuites()); + copy.setEnabledProtocols(getEnabledProtocols()); + copy.setDisabledProtocols(getDisabledProtocols()); + copy.setTrustAll(isTrustAll()); + copy.setVerifyHost(isVerifyHost()); + copy.setKeyAlias(getKeyAlias()); + copy.setContextProtocol(getContextProtocol()); + return copy; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java new file mode 100644 index 0000000..51cedea --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509ExtendedKeyManager; +import javax.net.ssl.X509TrustManager; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.URI; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import io.netty.handler.ssl.SslHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Static class that provides various utility methods used by Transport implementations. + */ +public class NettyTransportSupport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class); + + /** + * Creates a Netty SslHandler instance for use in Transports that require + * an SSL encoder / decoder. + * + * @param remote The URI of the remote peer that the SslHandler will be used against. + * @param options The SSL options object to build the SslHandler instance from. + * @return a new SslHandler that is configured from the given options. + * @throws Exception if an error occurs while creating the SslHandler instance. + */ + public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception { + return new SslHandler(createSslEngine(remote, createSslContext(options), options)); + } + + /** + * Create a new SSLContext using the options specific in the given TransportSslOptions + * instance. + * + * @param options the configured options used to create the SSLContext. + * @return a new SSLContext instance. + * @throws Exception if an error occurs while creating the context. + */ + public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception { + try { + String contextProtocol = options.getContextProtocol(); + LOG.trace("Getting SSLContext instance using protocol: {}", contextProtocol); + + SSLContext context = SSLContext.getInstance(contextProtocol); + KeyManager[] keyMgrs = loadKeyManagers(options); + TrustManager[] trustManagers = loadTrustManagers(options); + + context.init(keyMgrs, trustManagers, new SecureRandom()); + return context; + } + catch (Exception e) { + LOG.error("Failed to create SSLContext: {}", e, e); + throw e; + } + } + + /** + * Create a new SSLEngine instance in client mode from the given SSLContext and + * TransportSslOptions instances. + * + * @param context the SSLContext to use when creating the engine. + * @param options the TransportSslOptions to use to configure the new SSLEngine. + * @return a new SSLEngine instance in client mode. + * @throws Exception if an error occurs while creating the new SSLEngine. + */ + public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception { + return createSslEngine(null, context, options); + } + + /** + * Create a new SSLEngine instance in client mode from the given SSLContext and + * TransportSslOptions instances. + * + * @param remote the URI of the remote peer that will be used to initialize the engine, may be null if none should. + * @param context the SSLContext to use when creating the engine. + * @param options the TransportSslOptions to use to configure the new SSLEngine. + * @return a new SSLEngine instance in client mode. + * @throws Exception if an error occurs while creating the new SSLEngine. + */ + public static SSLEngine createSslEngine(URI remote, + SSLContext context, + NettyTransportSslOptions options) throws Exception { + SSLEngine engine = null; + if (remote == null) { + engine = context.createSSLEngine(); + } + else { + engine = context.createSSLEngine(remote.getHost(), remote.getPort()); + } + + engine.setEnabledProtocols(buildEnabledProtocols(engine, options)); + engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options)); + engine.setUseClientMode(true); + + if (options.isVerifyHost()) { + SSLParameters sslParameters = engine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); + engine.setSSLParameters(sslParameters); + } + + return engine; + } + + private static String[] buildEnabledProtocols(SSLEngine engine, NettyTransportSslOptions options) { + List<String> enabledProtocols = new ArrayList<>(); + + if (options.getEnabledProtocols() != null) { + List<String> configuredProtocols = Arrays.asList(options.getEnabledProtocols()); + LOG.trace("Configured protocols from transport options: {}", configuredProtocols); + enabledProtocols.addAll(configuredProtocols); + } + else { + List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols()); + LOG.trace("Default protocols from the SSLEngine: {}", engineProtocols); + enabledProtocols.addAll(engineProtocols); + } + + String[] disabledProtocols = options.getDisabledProtocols(); + if (disabledProtocols != null) { + List<String> disabled = Arrays.asList(disabledProtocols); + LOG.trace("Disabled protocols: {}", disabled); + enabledProtocols.removeAll(disabled); + } + + LOG.trace("Enabled protocols: {}", enabledProtocols); + + return enabledProtocols.toArray(new String[0]); + } + + private static String[] buildEnabledCipherSuites(SSLEngine engine, NettyTransportSslOptions options) { + List<String> enabledCipherSuites = new ArrayList<>(); + + if (options.getEnabledCipherSuites() != null) { + List<String> configuredCipherSuites = Arrays.asList(options.getEnabledCipherSuites()); + LOG.trace("Configured cipher suites from transport options: {}", configuredCipherSuites); + enabledCipherSuites.addAll(configuredCipherSuites); + } + else { + List<String> engineCipherSuites = Arrays.asList(engine.getEnabledCipherSuites()); + LOG.trace("Default cipher suites from the SSLEngine: {}", engineCipherSuites); + enabledCipherSuites.addAll(engineCipherSuites); + } + + String[] disabledCipherSuites = options.getDisabledCipherSuites(); + if (disabledCipherSuites != null) { + List<String> disabled = Arrays.asList(disabledCipherSuites); + LOG.trace("Disabled cipher suites: {}", disabled); + enabledCipherSuites.removeAll(disabled); + } + + LOG.trace("Enabled cipher suites: {}", enabledCipherSuites); + + return enabledCipherSuites.toArray(new String[0]); + } + + private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception { + if (options.isTrustAll()) { + return new TrustManager[]{createTrustAllTrustManager()}; + } + + if (options.getTrustStoreLocation() == null) { + return null; + } + + TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + + String storeLocation = options.getTrustStoreLocation(); + String storePassword = options.getTrustStorePassword(); + String storeType = options.getStoreType(); + + LOG.trace("Attempt to load TrustStore from location {} of type {}", storeLocation, storeType); + + KeyStore trustStore = loadStore(storeLocation, storePassword, storeType); + fact.init(trustStore); + + return fact.getTrustManagers(); + } + + private static KeyManager[] loadKeyManagers(NettyTransportSslOptions options) throws Exception { + if (options.getKeyStoreLocation() == null) { + return null; + } + + KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + + String storeLocation = options.getKeyStoreLocation(); + String storePassword = options.getKeyStorePassword(); + String storeType = options.getStoreType(); + String alias = options.getKeyAlias(); + + LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType); + + KeyStore keyStore = loadStore(storeLocation, storePassword, storeType); + fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null); + + if (alias == null) { + return fact.getKeyManagers(); + } + else { + validateAlias(keyStore, alias); + return wrapKeyManagers(alias, fact.getKeyManagers()); + } + } + + private static KeyManager[] wrapKeyManagers(String alias, KeyManager[] origKeyManagers) { + KeyManager[] keyManagers = new KeyManager[origKeyManagers.length]; + for (int i = 0; i < origKeyManagers.length; i++) { + KeyManager km = origKeyManagers[i]; + if (km instanceof X509ExtendedKeyManager) { + km = new X509AliasKeyManager(alias, (X509ExtendedKeyManager) km); + } + + keyManagers[i] = km; + } + + return keyManagers; + } + + private static void validateAlias(KeyStore store, String alias) throws IllegalArgumentException, KeyStoreException { + if (!store.containsAlias(alias)) { + throw new IllegalArgumentException("The alias '" + alias + "' doesn't exist in the key store"); + } + + if (!store.isKeyEntry(alias)) { + throw new IllegalArgumentException("The alias '" + alias + "' in the keystore doesn't represent a key entry"); + } + } + + private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception { + KeyStore store = KeyStore.getInstance(storeType); + try (InputStream in = new FileInputStream(new File(storePath));) { + store.load(in, password != null ? password.toCharArray() : null); + } + + return store; + } + + private static TrustManager createTrustAllTrustManager() { + return new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java new file mode 100644 index 0000000..b28f523 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.io.IOException; +import java.net.URI; +import java.security.Principal; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transport for communicating over WebSockets + */ +public class NettyWSTransport implements NettyTransport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class); + + private static final int QUIET_PERIOD = 20; + private static final int SHUTDOWN_TIMEOUT = 100; + + protected Bootstrap bootstrap; + protected EventLoopGroup group; + protected Channel channel; + protected NettyTransportListener listener; + protected NettyTransportOptions options; + protected final URI remote; + protected boolean secure; + + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private ChannelPromise handshakeFuture; + private IOException failureCause; + private Throwable pendingFailure; + + /** + * Create a new transport instance + * + * @param remoteLocation the URI that defines the remote resource to connect to. + * @param options the transport options used to configure the socket connection. + */ + public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener the TransportListener that will receive events from this Transport. + * @param remoteLocation the URI that defines the remote resource to connect to. + * @param options the transport options used to configure the socket connection. + */ + public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + this.options = options; + this.listener = listener; + this.remote = remoteLocation; + this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss"); + } + + @Override + public void connect() throws IOException { + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + group = new NioEventLoopGroup(1); + + bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer<Channel>() { + + @Override + public void initChannel(Channel connectedChannel) throws Exception { + configureChannel(connectedChannel); + } + }); + + configureNetty(bootstrap, getTransportOptions()); + + ChannelFuture future; + try { + future = bootstrap.connect(getRemoteHost(), getRemotePort()); + future.addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + handleConnected(future.channel()); + } + else if (future.isCancelled()) { + connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); + } + else { + connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); + } + } + }); + + future.sync(); + + // Now wait for WS protocol level handshake completion + handshakeFuture.await(); + } + catch (InterruptedException ex) { + LOG.debug("Transport connection attempt was interrupted."); + Thread.interrupted(); + failureCause = IOExceptionSupport.create(ex); + } + + if (failureCause != null) { + // Close out any Netty resources now as they are no longer needed. + if (channel != null) { + channel.close().syncUninterruptibly(); + channel = null; + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + group = null; + } + + throw failureCause; + } + else { + // Connected, allow any held async error to fire now and close the transport. + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + if (pendingFailure != null) { + channel.pipeline().fireExceptionCaught(pendingFailure); + } + } + }); + } + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public boolean isSSL() { + return secure; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + connected.set(false); + if (channel != null) { + channel.close().syncUninterruptibly(); + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } + } + } + + @Override + public ByteBuf allocateSendBuffer(int size) throws IOException { + checkConnected(); + return channel.alloc().ioBuffer(size, size); + } + + @Override + public void send(ByteBuf output) throws IOException { + checkConnected(); + int length = output.readableBytes(); + if (length == 0) { + return; + } + + LOG.trace("Attempted write of: {} bytes", length); + + channel.writeAndFlush(new BinaryWebSocketFrame(output)); + } + + @Override + public NettyTransportListener getTransportListener() { + return listener; + } + + @Override + public void setTransportListener(NettyTransportListener listener) { + this.listener = listener; + } + + @Override + public NettyTransportOptions getTransportOptions() { + if (options == null) { + if (isSSL()) { + options = NettyTransportSslOptions.INSTANCE; + } + else { + options = NettyTransportOptions.INSTANCE; + } + } + + return options; + } + + @Override + public URI getRemoteLocation() { + return remote; + } + + @Override + public Principal getLocalPrincipal() { + if (!isSSL()) { + throw new UnsupportedOperationException("Not connected to a secure channel"); + } + + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + + return sslHandler.engine().getSession().getLocalPrincipal(); + } + + //----- Internal implementation details, can be overridden as needed --// + + protected String getRemoteHost() { + return remote.getHost(); + } + + protected int getRemotePort() { + int port = remote.getPort(); + + if (port <= 0) { + if (isSSL()) { + port = getSslOptions().getDefaultSslPort(); + } + else { + port = getTransportOptions().getDefaultTcpPort(); + } + } + + return port; + } + + protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); + bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + + if (options.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); + } + + if (options.getReceiveBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); + } + + if (options.getTrafficClass() != -1) { + bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); + } + } + + protected void configureChannel(final Channel channel) throws Exception { + if (isSSL()) { + SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { + @Override + public void operationComplete(Future<Channel> future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + connectionEstablished(channel); + } + else { + LOG.trace("SSL Handshake has failed: {}", channel); + connectionFailed(channel, IOExceptionSupport.create(future.cause())); + } + } + }); + + channel.pipeline().addLast(sslHandler); + } + + channel.pipeline().addLast(new HttpClientCodec()); + channel.pipeline().addLast(new HttpObjectAggregator(8192)); + channel.pipeline().addLast(new NettyTcpTransportHandler()); + } + + protected void handleConnected(final Channel channel) throws Exception { + if (!isSSL()) { + connectionEstablished(channel); + } + } + + //----- State change handlers and checks ---------------------------------// + + /** + * Called when the transport has successfully connected and is ready for use. + */ + protected void connectionEstablished(Channel connectedChannel) { + LOG.info("WebSocket connectionEstablished! {}", connectedChannel); + channel = connectedChannel; + connected.set(true); + } + + /** + * Called when the transport connection failed and an error should be returned. + * + * @param failedChannel The Channel instance that failed. + * @param cause An IOException that describes the cause of the failed connection. + */ + protected void connectionFailed(Channel failedChannel, IOException cause) { + failureCause = IOExceptionSupport.create(cause); + channel = failedChannel; + connected.set(false); + handshakeFuture.setFailure(cause); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + //----- Handle connection events -----------------------------------------// + + private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> { + + private final WebSocketClientHandshaker handshaker; + + NettyTcpTransportHandler() { + handshaker = WebSocketClientHandshakerFactory.newHandshaker(remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()); + } + + @Override + public void handlerAdded(ChannelHandlerContext context) { + LOG.trace("Handler has become added! Channel is {}", context.channel()); + handshakeFuture = context.newPromise(); + } + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has become active! Channel is {}", context.channel()); + handshaker.handshake(context.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage()); + LOG.trace("Error Stack: ", cause); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (pendingFailure != null) { + listener.onTransportError(pendingFailure); + } + else { + listener.onTransportError(cause); + } + } + else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (pendingFailure != null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + pendingFailure = cause; + } + + if (!handshakeFuture.isDone()) { + handshakeFuture.setFailure(cause); + } + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception { + LOG.trace("New data read: incoming: {}", message); + + Channel ch = ctx.channel(); + if (!handshaker.isHandshakeComplete()) { + handshaker.finishHandshake(ch, (FullHttpResponse) message); + LOG.info("WebSocket Client connected! {}", ctx.channel()); + handshakeFuture.setSuccess(); + return; + } + + // We shouldn't get this since we handle the handshake previously. + if (message instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) message; + throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); + } + + WebSocketFrame frame = (WebSocketFrame) message; + if (frame instanceof TextWebSocketFrame) { + TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; + LOG.warn("WebSocket Client received message: " + textFrame.text()); + ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket.")); + } + else if (frame instanceof BinaryWebSocketFrame) { + BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame; + LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); + listener.onData(binaryFrame.content()); + } + else if (frame instanceof PongWebSocketFrame) { + LOG.trace("WebSocket Client received pong"); + } + else if (frame instanceof CloseWebSocketFrame) { + LOG.trace("WebSocket Client received closing"); + ch.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java new file mode 100644 index 0000000..c3c4286 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; + +/** + * A {@link ByteBufAllocator} which is partial pooled. Which means only direct + * {@link ByteBuf}s are pooled. The rest is unpooled. + * + */ +public class PartialPooledByteBufAllocator implements ByteBufAllocator { + + private static final ByteBufAllocator POOLED = new PooledByteBufAllocator(false); + private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false); + + public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator(); + + private PartialPooledByteBufAllocator() { + } + + @Override + public ByteBuf buffer() { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf buffer(int initialCapacity) { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer() { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf directBuffer() { + return POOLED.directBuffer(); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) { + return POOLED.directBuffer(initialCapacity); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + return POOLED.directBuffer(initialCapacity, maxCapacity); + } + + @Override + public CompositeByteBuf compositeBuffer() { + return UNPOOLED.compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + return UNPOOLED.compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + return UNPOOLED.compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + return UNPOOLED.compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + return POOLED.compositeDirectBuffer(); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + return POOLED.compositeDirectBuffer(); + } + + @Override + public boolean isDirectBufferPooled() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java new file mode 100644 index 0000000..42d6a0b --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.X509ExtendedKeyManager; +import java.net.Socket; +import java.security.Principal; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; + +/** + * An X509ExtendedKeyManager wrapper which always chooses and only + * returns the given alias, and defers retrieval to the delegate + * key manager. + */ +public class X509AliasKeyManager extends X509ExtendedKeyManager { + + private X509ExtendedKeyManager delegate; + private String alias; + + public X509AliasKeyManager(String alias, X509ExtendedKeyManager delegate) throws IllegalArgumentException { + if (alias == null) { + throw new IllegalArgumentException("The given key alias must not be null."); + } + + this.alias = alias; + this.delegate = delegate; + } + + @Override + public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) { + return alias; + } + + @Override + public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) { + return alias; + } + + @Override + public X509Certificate[] getCertificateChain(String alias) { + return delegate.getCertificateChain(alias); + } + + @Override + public String[] getClientAliases(String keyType, Principal[] issuers) { + return new String[]{alias}; + } + + @Override + public PrivateKey getPrivateKey(String alias) { + return delegate.getPrivateKey(alias); + } + + @Override + public String[] getServerAliases(String keyType, Principal[] issuers) { + return new String[]{alias}; + } + + @Override + public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) { + return alias; + } + + @Override + public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) { + return alias; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java new file mode 100644 index 0000000..bb71746 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +/** + * Defines a result interface for Asynchronous operations. + */ +public interface AsyncResult { + + /** + * If the operation fails this method is invoked with the Exception + * that caused the failure. + * + * @param result The error that resulted in this asynchronous operation failing. + */ + void onFailure(Throwable result); + + /** + * If the operation succeeds the resulting value produced is set to null and + * the waiting parties are signaled. + */ + void onSuccess(); + + /** + * Returns true if the AsyncResult has completed. The task is considered complete + * regardless if it succeeded or failed. + * + * @return returns true if the asynchronous operation has completed. + */ + boolean isComplete(); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java new file mode 100644 index 0000000..12d38fd --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Asynchronous Client Future class. + */ +public class ClientFuture implements AsyncResult { + + private final AtomicBoolean completer = new AtomicBoolean(); + private final CountDownLatch latch = new CountDownLatch(1); + private final ClientFutureSynchronization synchronization; + private volatile Throwable error; + + public ClientFuture() { + this(null); + } + + public ClientFuture(ClientFutureSynchronization synchronization) { + this.synchronization = synchronization; + } + + @Override + public boolean isComplete() { + return latch.getCount() == 0; + } + + @Override + public void onFailure(Throwable result) { + if (completer.compareAndSet(false, true)) { + error = result; + if (synchronization != null) { + synchronization.onPendingFailure(error); + } + latch.countDown(); + } + } + + @Override + public void onSuccess() { + if (completer.compareAndSet(false, true)) { + if (synchronization != null) { + synchronization.onPendingSuccess(); + } + latch.countDown(); + } + } + + /** + * Timed wait for a response to a pending operation. + * + * @param amount The amount of time to wait before abandoning the wait. + * @param unit The unit to use for this wait period. + * @throws IOException if an error occurs while waiting for the response. + */ + public void sync(long amount, TimeUnit unit) throws IOException { + try { + latch.await(amount, unit); + } + catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + + failOnError(); + } + + /** + * Waits for a response to some pending operation. + * + * @throws IOException if an error occurs while waiting for the response. + */ + public void sync() throws IOException { + try { + latch.await(); + } + catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + + failOnError(); + } + + private void failOnError() throws IOException { + Throwable cause = error; + if (cause != null) { + throw IOExceptionSupport.create(cause); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java new file mode 100644 index 0000000..e279bc1 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +/** + * Synchronization callback interface used to execute state updates + * or similar tasks in the thread context where the associated + * ProviderFuture is managed. + */ +public interface ClientFutureSynchronization { + + void onPendingSuccess(); + + void onPendingFailure(Throwable cause); + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java new file mode 100644 index 0000000..70d88e6 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.io.IOException; + +/** + * Used to make throwing IOException instances easier. + */ +public class IOExceptionSupport { + + /** + * Checks the given cause to determine if it's already an IOException type and + * if not creates a new IOException to wrap it. + * + * @param cause The initiating exception that should be cast or wrapped. + * @return an IOException instance. + */ + public static IOException create(Throwable cause) { + if (cause instanceof IOException) { + return (IOException) cause; + } + + String message = cause.getMessage(); + if (message == null || message.length() == 0) { + message = cause.toString(); + } + + return new IOException(message, cause); + } +}
