http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java new file mode 100644 index 0000000..1b604fe --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.io.IOException; +import java.net.URI; +import java.security.Principal; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +/** + * Transport for communicating over WebSockets + */ +public class NettyWSTransport implements NettyTransport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class); + + private static final int QUIET_PERIOD = 20; + private static final int SHUTDOWN_TIMEOUT = 100; + + protected Bootstrap bootstrap; + protected EventLoopGroup group; + protected Channel channel; + protected NettyTransportListener listener; + protected NettyTransportOptions options; + protected final URI remote; + protected boolean secure; + + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private ChannelPromise handshakeFuture; + private IOException failureCause; + private Throwable pendingFailure; + + /** + * Create a new transport instance + * + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + this.options = options; + this.listener = listener; + this.remote = remoteLocation; + this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss"); + } + + @Override + public void connect() throws IOException { + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + group = new NioEventLoopGroup(1); + + bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer<Channel>() { + + @Override + public void initChannel(Channel connectedChannel) throws Exception { + configureChannel(connectedChannel); + } + }); + + configureNetty(bootstrap, getTransportOptions()); + + ChannelFuture future; + try { + future = bootstrap.connect(getRemoteHost(), getRemotePort()); + future.addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + handleConnected(future.channel()); + } else if (future.isCancelled()) { + connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); + } else { + connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); + } + } + }); + + future.sync(); + + // Now wait for WS protocol level handshake completion + handshakeFuture.await(); + } catch (InterruptedException ex) { + LOG.debug("Transport connection attempt was interrupted."); + Thread.interrupted(); + failureCause = IOExceptionSupport.create(ex); + } + + if (failureCause != null) { + // Close out any Netty resources now as they are no longer needed. + if (channel != null) { + channel.close().syncUninterruptibly(); + channel = null; + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + group = null; + } + + throw failureCause; + } else { + // Connected, allow any held async error to fire now and close the transport. + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + if (pendingFailure != null) { + channel.pipeline().fireExceptionCaught(pendingFailure); + } + } + }); + } + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public boolean isSSL() { + return secure; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + connected.set(false); + if (channel != null) { + channel.close().syncUninterruptibly(); + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } + } + } + + @Override + public ByteBuf allocateSendBuffer(int size) throws IOException { + checkConnected(); + return channel.alloc().ioBuffer(size, size); + } + + @Override + public void send(ByteBuf output) throws IOException { + checkConnected(); + int length = output.readableBytes(); + if (length == 0) { + return; + } + + LOG.trace("Attempted write of: {} bytes", length); + + channel.writeAndFlush(new BinaryWebSocketFrame(output)); + } + + @Override + public NettyTransportListener getTransportListener() { + return listener; + } + + @Override + public void setTransportListener(NettyTransportListener listener) { + this.listener = listener; + } + + @Override + public NettyTransportOptions getTransportOptions() { + if (options == null) { + if (isSSL()) { + options = NettyTransportSslOptions.INSTANCE; + } else { + options = NettyTransportOptions.INSTANCE; + } + } + + return options; + } + + @Override + public URI getRemoteLocation() { + return remote; + } + + @Override + public Principal getLocalPrincipal() { + if (!isSSL()) { + throw new UnsupportedOperationException("Not connected to a secure channel"); + } + + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + + return sslHandler.engine().getSession().getLocalPrincipal(); + } + + //----- Internal implementation details, can be overridden as needed --// + + protected String getRemoteHost() { + return remote.getHost(); + } + + protected int getRemotePort() { + int port = remote.getPort(); + + if (port <= 0) { + if (isSSL()) { + port = getSslOptions().getDefaultSslPort(); + } else { + port = getTransportOptions().getDefaultTcpPort(); + } + } + + return port; + } + + protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); + bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + + if (options.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); + } + + if (options.getReceiveBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); + } + + if (options.getTrafficClass() != -1) { + bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); + } + } + + protected void configureChannel(final Channel channel) throws Exception { + if (isSSL()) { + SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { + @Override + public void operationComplete(Future<Channel> future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + connectionEstablished(channel); + } else { + LOG.trace("SSL Handshake has failed: {}", channel); + connectionFailed(channel, IOExceptionSupport.create(future.cause())); + } + } + }); + + channel.pipeline().addLast(sslHandler); + } + + channel.pipeline().addLast(new HttpClientCodec()); + channel.pipeline().addLast(new HttpObjectAggregator(8192)); + channel.pipeline().addLast(new NettyTcpTransportHandler()); + } + + protected void handleConnected(final Channel channel) throws Exception { + if (!isSSL()) { + connectionEstablished(channel); + } + } + + //----- State change handlers and checks ---------------------------------// + + /** + * Called when the transport has successfully connected and is ready for use. + */ + protected void connectionEstablished(Channel connectedChannel) { + LOG.info("WebSocket connectionEstablished! {}", connectedChannel); + channel = connectedChannel; + connected.set(true); + } + + /** + * Called when the transport connection failed and an error should be returned. + * + * @param failedChannel + * The Channel instance that failed. + * @param cause + * An IOException that describes the cause of the failed connection. + */ + protected void connectionFailed(Channel failedChannel, IOException cause) { + failureCause = IOExceptionSupport.create(cause); + channel = failedChannel; + connected.set(false); + handshakeFuture.setFailure(cause); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + //----- Handle connection events -----------------------------------------// + + private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> { + + private final WebSocketClientHandshaker handshaker; + + public NettyTcpTransportHandler() { + handshaker = WebSocketClientHandshakerFactory.newHandshaker( + remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()); + } + + @Override + public void handlerAdded(ChannelHandlerContext context) { + LOG.trace("Handler has become added! Channel is {}", context.channel()); + handshakeFuture = context.newPromise(); + } + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has become active! Channel is {}", context.channel()); + handshaker.handshake(context.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage()); + LOG.trace("Error Stack: ", cause); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (pendingFailure != null) { + listener.onTransportError(pendingFailure); + } else { + listener.onTransportError(cause); + } + } else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (pendingFailure != null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + pendingFailure = cause; + } + + if (!handshakeFuture.isDone()) { + handshakeFuture.setFailure(cause); + } + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception { + LOG.trace("New data read: incoming: {}", message); + + Channel ch = ctx.channel(); + if (!handshaker.isHandshakeComplete()) { + handshaker.finishHandshake(ch, (FullHttpResponse) message); + LOG.info("WebSocket Client connected! {}", ctx.channel()); + handshakeFuture.setSuccess(); + return; + } + + // We shouldn't get this since we handle the handshake previously. + if (message instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) message; + throw new IllegalStateException( + "Unexpected FullHttpResponse (getStatus=" + response.getStatus() + + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); + } + + WebSocketFrame frame = (WebSocketFrame) message; + if (frame instanceof TextWebSocketFrame) { + TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; + LOG.warn("WebSocket Client received message: " + textFrame.text()); + ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket.")); + } else if (frame instanceof BinaryWebSocketFrame) { + BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame; + LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); + listener.onData(binaryFrame.content()); + } else if (frame instanceof PongWebSocketFrame) { + LOG.trace("WebSocket Client received pong"); + } else if (frame instanceof CloseWebSocketFrame) { + LOG.trace("WebSocket Client received closing"); + ch.close(); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java index 75e30f9..e9d50b7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -32,14 +34,30 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; import org.apache.qpid.proton.engine.Connection; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Test handling of heartbeats requested by the broker. */ +@RunWith(Parameterized.class) public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport { private final int TEST_IDLE_TIMEOUT = 3000; + @Parameters(name="connector={0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {"amqp", false}, + {"amqp+ws", false}, + }); + } + + public AmqpBrokerReuqestedHearbeatsTest(String connectorScheme, boolean secure) { + super(connectorScheme, secure); + } + @Override protected String getAdditionalConfig() { return "&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT; http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java index c7ab0cd..3c779a2 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -32,14 +34,30 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.activemq.util.Wait; import org.apache.qpid.proton.engine.Connection; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Tests that cover broker behavior when the client requests heartbeats */ +@RunWith(Parameterized.class) public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport { private final int TEST_IDLE_TIMEOUT = 3000; + @Parameters(name="connector={0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {"amqp", false}, + {"amqp+ws", false}, + }); + } + + public AmqpClientRequestsHeartbeatsTest(String connectorScheme, boolean secure) { + super(connectorScheme, secure); + } + @Override protected String getAdditionalConfig() { return "&transport.wireFormat.idleTimeout=0"; http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java index fa519ab..2d154e6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java @@ -25,6 +25,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import java.util.Arrays; +import java.util.Collection; import java.util.Map; import org.apache.activemq.transport.amqp.AmqpSupport; @@ -37,16 +39,34 @@ import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Test broker handling of AMQP connections with various configurations. */ +@RunWith(Parameterized.class) public class AmqpConnectionsTest extends AmqpClientTestSupport { private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + @Parameters(name="{0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {"amqp", false}, + {"amqp+ws", false}, + {"amqp+ssl", true}, + {"amqp+wss", true} + }); + } + + public AmqpConnectionsTest(String connectorScheme, boolean secure) { + super(connectorScheme, secure); + } + @Test(timeout = 60000) public void testCanConnect() throws Exception { AmqpClient client = createAmqpClient(); http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties index d25017d..f88b152 100755 --- a/activemq-amqp/src/test/resources/log4j.properties +++ b/activemq-amqp/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ # log4j.rootLogger=WARN, console, file log4j.logger.org.apache.activemq=INFO -log4j.logger.org.apache.activemq.transport.amqp=DEBUG +log4j.logger.org.apache.activemq.transport.amqp=TRACE log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO log4j.logger.org.fusesource=INFO http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 9e13cf9..5f75a3c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.vm; import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; +import java.security.cert.X509Certificate; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -34,6 +35,7 @@ import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -438,4 +440,19 @@ public class VMTransport implements Transport, Task { public int getReceiveCounter() { return receiveCounter; } + + @Override + public X509Certificate[] getPeerCertificates() { + return null; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + + } + + @Override + public WireFormat getWireFormat() { + return null; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java b/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java index d24596a..2067c14 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java @@ -18,7 +18,10 @@ package org.apache.activemq.transport; import java.io.IOException; import java.net.URI; +import java.security.cert.X509Certificate; + import org.apache.activemq.Service; +import org.apache.activemq.wireformat.WireFormat; /** * Represents the client side of a transport allowing messages to be sent @@ -116,6 +119,7 @@ public interface Transport extends Service { * @return true if updating uris is supported */ boolean isUpdateURIsSupported(); + /** * reconnect to another location * @param uri @@ -139,4 +143,25 @@ public interface Transport extends Service { * @return a counter which gets incremented as data is read from the transport. */ int getReceiveCounter(); + + /** + * @return the Certificates provided by the peer, or null if not a secure channel. + */ + X509Certificate[] getPeerCertificates(); + + /** + * Sets the certificates provided by the connected peer. + * + * @param certificates + * the Certificates provided by the peer. + */ + void setPeerCertificates(X509Certificate[] certificates); + + /** + * Retrieves the WireFormat instance associated with this Transport instance. + * + * @return the WireFormat in use. + */ + WireFormat getWireFormat(); + } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java index b0fafe8..ce02a7a 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java @@ -18,9 +18,12 @@ package org.apache.activemq.transport; import java.io.IOException; import java.net.URI; +import java.security.cert.X509Certificate; + +import org.apache.activemq.wireformat.WireFormat; /** - * + * */ public class TransportFilter implements TransportListener, Transport { protected final Transport next; @@ -30,10 +33,12 @@ public class TransportFilter implements TransportListener, Transport { this.next = next; } + @Override public TransportListener getTransportListener() { return transportListener; } + @Override public void setTransportListener(TransportListener channelListener) { this.transportListener = channelListener; if (channelListener == null) { @@ -48,6 +53,7 @@ public class TransportFilter implements TransportListener, Transport { * @throws IOException * if the next channel has not been set. */ + @Override public void start() throws Exception { if (next == null) { throw new IOException("The next channel has not been set."); @@ -61,10 +67,12 @@ public class TransportFilter implements TransportListener, Transport { /** * @see org.apache.activemq.Service#stop() */ + @Override public void stop() throws Exception { next.stop(); } + @Override public void onCommand(Object command) { transportListener.onCommand(command); } @@ -81,34 +89,42 @@ public class TransportFilter implements TransportListener, Transport { return next.toString(); } + @Override public void oneway(Object command) throws IOException { next.oneway(command); } + @Override public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { return next.asyncRequest(command, null); } + @Override public Object request(Object command) throws IOException { return next.request(command); } + @Override public Object request(Object command, int timeout) throws IOException { return next.request(command, timeout); } + @Override public void onException(IOException error) { transportListener.onException(error); } + @Override public void transportInterupted() { transportListener.transportInterupted(); } + @Override public void transportResumed() { transportListener.transportResumed(); } + @Override public <T> T narrow(Class<T> target) { if (target.isAssignableFrom(getClass())) { return target.cast(this); @@ -116,6 +132,7 @@ public class TransportFilter implements TransportListener, Transport { return next.narrow(target); } + @Override public String getRemoteAddress() { return next.getRemoteAddress(); } @@ -124,35 +141,58 @@ public class TransportFilter implements TransportListener, Transport { * @return * @see org.apache.activemq.transport.Transport#isFaultTolerant() */ + @Override public boolean isFaultTolerant() { return next.isFaultTolerant(); } + @Override public boolean isDisposed() { return next.isDisposed(); } + @Override public boolean isConnected() { return next.isConnected(); } + @Override public void reconnect(URI uri) throws IOException { next.reconnect(uri); } + @Override public int getReceiveCounter() { return next.getReceiveCounter(); } + @Override public boolean isReconnectSupported() { return next.isReconnectSupported(); } + @Override public boolean isUpdateURIsSupported() { return next.isUpdateURIsSupported(); } + @Override public void updateURIs(boolean rebalance,URI[] uris) throws IOException { next.updateURIs(rebalance,uris); } + + @Override + public X509Certificate[] getPeerCertificates() { + return next.getPeerCertificates(); + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + next.setPeerCertificates(certificates); + } + + @Override + public WireFormat getWireFormat() { + return next.getWireFormat(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index f502179..a46b318 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -26,6 +26,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -61,6 +62,7 @@ import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1448,4 +1450,28 @@ public class FailoverTransport implements CompositeTransport { public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) { this.warnAfterReconnectAttempts = warnAfterReconnectAttempts; } + + @Override + public X509Certificate[] getPeerCertificates() { + Transport transport = connectedTransport.get(); + if (transport != null) { + return transport.getPeerCertificates(); + } else { + return null; + } + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + } + + @Override + public WireFormat getWireFormat() { + Transport transport = connectedTransport.get(); + if (transport != null) { + return transport.getWireFormat(); + } else { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 00ae7ae..d7f4f85 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -19,6 +19,7 @@ package org.apache.activemq.transport.fanout; import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; @@ -44,13 +45,12 @@ import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A Transport that fans out a connection to multiple brokers. - * - * */ public class FanoutTransport implements CompositeTransport { @@ -113,9 +113,9 @@ public class FanoutTransport implements CompositeTransport { @Override public void onCommand(Object o) { - Command command = (Command)o; + Command command = (Command) o; if (command.isResponse()) { - Integer id = new Integer(((Response)command).getCorrelationId()); + Integer id = new Integer(((Response) command).getCorrelationId()); RequestCounter rc = requestMap.get(id); if (rc != null) { if (rc.ackCount.decrementAndGet() <= 0) { @@ -191,7 +191,7 @@ public class FanoutTransport implements CompositeTransport { // Try to connect them up. Iterator<FanoutTransportHandler> iter = transports.iterator(); - for (int i = 0; iter.hasNext() && !disposed; i++) { + while (iter.hasNext() && !disposed) { long now = System.currentTimeMillis(); @@ -228,9 +228,9 @@ public class FanoutTransport implements CompositeTransport { } catch (Exception e) { LOG.debug("Connect fail to: " + uri + ", reason: " + e); - if( fanoutHandler.transport !=null ) { + if (fanoutHandler.transport != null) { ServiceSupport.dispose(fanoutHandler.transport); - fanoutHandler.transport=null; + fanoutHandler.transport = null; } if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) { @@ -256,14 +256,13 @@ public class FanoutTransport implements CompositeTransport { } } } + if (transports.size() == connectedCount || disposed) { reconnectMutex.notifyAll(); return false; } - } } - } try { @@ -292,7 +291,7 @@ public class FanoutTransport implements CompositeTransport { restoreTransport(th); } } - connected=true; + connected = true; } } @@ -307,7 +306,7 @@ public class FanoutTransport implements CompositeTransport { } started = false; disposed = true; - connected=false; + connected = false; for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { FanoutTransportHandler th = iter.next(); @@ -367,7 +366,7 @@ public class FanoutTransport implements CompositeTransport { @Override public void oneway(Object o) throws IOException { - final Command command = (Command)o; + final Command command = (Command) o; try { synchronized (reconnectMutex) { @@ -392,7 +391,7 @@ public class FanoutTransport implements CompositeTransport { } if (error instanceof IOException) { - throw (IOException)error; + throw (IOException) error; } throw IOExceptionSupport.create(error); } @@ -428,7 +427,6 @@ public class FanoutTransport implements CompositeTransport { primary.onException(e); } } - } } catch (InterruptedException e) { // Some one may be trying to stop our thread. @@ -443,13 +441,12 @@ public class FanoutTransport implements CompositeTransport { */ private boolean isFanoutCommand(Command command) { if (command.isMessage()) { - if( fanOutQueues ) { + if (fanOutQueues) { return true; } - return ((Message)command).getDestination().isTopic(); + return ((Message) command).getDestination().isTopic(); } - if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || - command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) { + if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) { return false; } return true; @@ -491,7 +488,6 @@ public class FanoutTransport implements CompositeTransport { @Override public <T> T narrow(Class<T> target) { - if (target.isAssignableFrom(getClass())) { return target.cast(this); } @@ -509,7 +505,6 @@ public class FanoutTransport implements CompositeTransport { } return null; - } protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException { @@ -523,8 +518,7 @@ public class FanoutTransport implements CompositeTransport { } @Override - public void add(boolean reblance,URI uris[]) { - + public void add(boolean reblance, URI uris[]) { synchronized (reconnectMutex) { for (int i = 0; i < uris.length; i++) { URI uri = uris[i]; @@ -537,6 +531,7 @@ public class FanoutTransport implements CompositeTransport { break; } } + if (!match) { FanoutTransportHandler th = new FanoutTransportHandler(uri); transports.add(th); @@ -544,12 +539,10 @@ public class FanoutTransport implements CompositeTransport { } } } - } @Override - public void remove(boolean rebalance,URI uris[]) { - + public void remove(boolean rebalance, URI uris[]) { synchronized (reconnectMutex) { for (int i = 0; i < uris.length; i++) { URI uri = uris[i]; @@ -567,13 +560,11 @@ public class FanoutTransport implements CompositeTransport { } } } - } @Override public void reconnect(URI uri) throws IOException { - add(true,new URI[]{uri}); - + add(true, new URI[] { uri }); } @Override @@ -585,12 +576,12 @@ public class FanoutTransport implements CompositeTransport { public boolean isUpdateURIsSupported() { return true; } + @Override - public void updateURIs(boolean reblance,URI[] uris) throws IOException { - add(reblance,uris); + public void updateURIs(boolean reblance, URI[] uris) throws IOException { + add(reblance, uris); } - @Override public String getRemoteAddress() { if (primary != null) { @@ -625,7 +616,6 @@ public class FanoutTransport implements CompositeTransport { return disposed; } - @Override public boolean isConnected() { return connected; @@ -643,4 +633,19 @@ public class FanoutTransport implements CompositeTransport { } return rc; } + + @Override + public X509Certificate[] getPeerCertificates() { + return null; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + + } + + @Override + public WireFormat getWireFormat() { + return null; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java index 60c94af..8b00e27 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -18,16 +18,16 @@ package org.apache.activemq.transport.mock; import java.io.IOException; import java.net.URI; +import java.security.cert.X509Certificate; + import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.wireformat.WireFormat; -/** - * - */ public class MockTransport extends DefaultTransportListener implements Transport { protected Transport next; @@ -37,8 +37,7 @@ public class MockTransport extends DefaultTransportListener implements Transport this.next = next; } - /** - */ + @Override public synchronized void setTransportListener(TransportListener channelListener) { this.transportListener = channelListener; if (channelListener == null) { @@ -50,8 +49,10 @@ public class MockTransport extends DefaultTransportListener implements Transport /** * @see org.apache.activemq.Service#start() - * @throws IOException if the next channel has not been set. + * @throws IOException + * if the next channel has not been set. */ + @Override public void start() throws Exception { if (getNext() == null) { throw new IOException("The next channel has not been set."); @@ -65,6 +66,7 @@ public class MockTransport extends DefaultTransportListener implements Transport /** * @see org.apache.activemq.Service#stop() */ + @Override public void stop() throws Exception { getNext().stop(); } @@ -84,6 +86,7 @@ public class MockTransport extends DefaultTransportListener implements Transport /** * @return Returns the packetListener. */ + @Override public synchronized TransportListener getTransportListener() { return transportListener; } @@ -93,18 +96,22 @@ public class MockTransport extends DefaultTransportListener implements Transport return getNext().toString(); } + @Override public void oneway(Object command) throws IOException { getNext().oneway(command); } + @Override public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { return getNext().asyncRequest(command, null); } + @Override public Object request(Object command) throws IOException { return getNext().request(command); } + @Override public Object request(Object command, int timeout) throws IOException { return getNext().request(command, timeout); } @@ -114,6 +121,7 @@ public class MockTransport extends DefaultTransportListener implements Transport getTransportListener().onException(error); } + @Override public <T> T narrow(Class<T> target) { if (target.isAssignableFrom(getClass())) { return target.cast(this); @@ -131,6 +139,7 @@ public class MockTransport extends DefaultTransportListener implements Transport setNext(filter); } + @Override public String getRemoteAddress() { return getNext().getRemoteAddress(); } @@ -138,35 +147,58 @@ public class MockTransport extends DefaultTransportListener implements Transport /** * @see org.apache.activemq.transport.Transport#isFaultTolerant() */ + @Override public boolean isFaultTolerant() { return getNext().isFaultTolerant(); } - public boolean isDisposed() { - return getNext().isDisposed(); - } - - public boolean isConnected() { - return getNext().isConnected(); + @Override + public boolean isDisposed() { + return getNext().isDisposed(); } - public void reconnect(URI uri) throws IOException { - getNext().reconnect(uri); - } + @Override + public boolean isConnected() { + return getNext().isConnected(); + } + @Override + public void reconnect(URI uri) throws IOException { + getNext().reconnect(uri); + } + + @Override public int getReceiveCounter() { return getNext().getReceiveCounter(); } - + @Override public boolean isReconnectSupported() { return getNext().isReconnectSupported(); } + @Override public boolean isUpdateURIsSupported() { return getNext().isUpdateURIsSupported(); } - public void updateURIs(boolean reblance,URI[] uris) throws IOException { - getNext().updateURIs(reblance,uris); + + @Override + public void updateURIs(boolean reblance, URI[] uris) throws IOException { + getNext().updateURIs(reblance, uris); + } + + @Override + public X509Certificate[] getPeerCertificates() { + return getNext().getPeerCertificates(); + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + getNext().setPeerCertificates(certificates); + } + + @Override + public WireFormat getWireFormat() { + return getNext().getWireFormat(); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java index 2b3953f..0c2fab9 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.transport.tcp; import java.io.IOException; @@ -43,6 +42,7 @@ import org.apache.activemq.wireformat.WireFormat; * unexpected situations may occur. */ public class SslTransport extends TcpTransport { + /** * Connect to a remote node such as a Broker. * @@ -56,6 +56,7 @@ public class SslTransport extends TcpTransport { * @throws UnknownHostException If TcpTransport throws. * @throws IOException If TcpTransport throws. */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public SslTransport(WireFormat wireFormat, SSLSocketFactory socketFactory, URI remoteLocation, URI localLocation, boolean needClientAuth) throws IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); if (this.socket != null) { @@ -65,7 +66,7 @@ public class SslTransport extends TcpTransport { // a single proxy to route to different messaging apps. // On java 1.7 it seems like it can only be configured via reflection. - // todo: find out if this will work on java 1.8 + // TODO: find out if this will work on java 1.8 HashMap props = new HashMap(); props.put("host", remoteLocation.getHost()); IntrospectionSupport.setProperties(this.socket, props); @@ -110,6 +111,7 @@ public class SslTransport extends TcpTransport { /** * @return peer certificate chain associated with the ssl socket */ + @Override public X509Certificate[] getPeerCertificates() { SSLSocket sslSocket = (SSLSocket)this.socket; @@ -120,7 +122,7 @@ public class SslTransport extends TcpTransport { try { clientCertChain = (X509Certificate[])sslSession.getPeerCertificates(); } catch (SSLPeerUnverifiedException e) { - clientCertChain = null; + clientCertChain = null; } return clientCertChain; @@ -133,5 +135,4 @@ public class SslTransport extends TcpTransport { public String toString() { return "ssl://" + socket.getInetAddress() + ":" + socket.getPort(); } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 60fe283..04d1636 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -1,5 +1,5 @@ -/** -gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more +/* + * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 @@ -29,6 +29,7 @@ import java.net.SocketTimeoutException; import java.net.URI; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.security.cert.X509Certificate; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -51,12 +52,11 @@ import org.slf4j.LoggerFactory; /** * An implementation of the {@link Transport} interface using raw tcp/ip - * - * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) - * */ public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { + private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); + protected final URI remoteLocation; protected final URI localLocation; protected final WireFormat wireFormat; @@ -754,4 +754,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S public WireFormat getWireFormat() { return wireFormat; } + + @Override + public X509Certificate[] getPeerCertificates() { + return null; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java index daa4860..d1ac088 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -20,7 +20,6 @@ import java.io.EOFException; import java.io.IOException; import java.net.BindException; import java.net.DatagramSocket; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; @@ -28,6 +27,7 @@ import java.net.URI; import java.net.UnknownHostException; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.DatagramChannel; +import java.security.cert.X509Certificate; import org.apache.activemq.Service; import org.apache.activemq.command.Command; @@ -47,10 +47,9 @@ import org.slf4j.LoggerFactory; /** * An implementation of the {@link Transport} interface using raw UDP - * - * */ public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable { + private static final Logger LOG = LoggerFactory.getLogger(UdpTransport.class); private static final int MAX_BIND_ATTEMPTS = 50; @@ -112,6 +111,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S /** * A one way asynchronous send */ + @Override public void oneway(Object command) throws IOException { oneway(command, targetAddress); } @@ -130,6 +130,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S /** * @return pretty print of 'this' */ + @Override public String toString() { if (description != null) { return description + port; @@ -141,6 +142,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S /** * reads packets from a Socket */ + @Override public void run() { LOG.trace("Consumer thread starting for: " + toString()); while (!isStopped()) { @@ -350,6 +352,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S return host; } + @Override protected void doStart() throws Exception { getCommandChannel().start(); @@ -387,7 +390,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S // down // a previously bound socket, it can take a little while before we can // bind it again. - // + // for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) { try { socket.bind(localAddress); @@ -419,6 +422,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S return new InetSocketAddress(port); } + @Override protected void doStop(ServiceStopper stopper) throws Exception { if (channel != null) { channel.close(); @@ -457,6 +461,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S } } + @Override public String getRemoteAddress() { if (targetAddress != null) { return "" + targetAddress; @@ -464,10 +469,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S return null; } + @Override public int getReceiveCounter() { if (commandChannel == null) { return 0; } return commandChannel.getReceiveCounter(); } + + @Override + public X509Certificate[] getPeerCertificates() { + return null; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java new file mode 100644 index 0000000..e15f86f --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.activemq.transport.Transport; + +/** + * Interface for a WebSocket Transport which provide hooks that a servlet can + * use to pass along WebSocket data and events. + */ +public interface WSTransport extends Transport { + + /** + * WS Transport output sink, used to give the WS Transport implementation + * a way to produce output back to the WS connection without coupling it + * to the implementation. + */ + public interface WSTransportSink { + + /** + * Called from the Transport when new outgoing String data is ready. + * + * @param data + * The newly prepared outgoing string data. + * + * @throws IOException if an error occurs or the socket doesn't support text data. + */ + void onSocketOutboundText(String data) throws IOException; + + /** + * Called from the Transport when new outgoing String data is ready. + * + * @param data + * The newly prepared outgoing string data. + * + * @throws IOException if an error occurs or the socket doesn't support text data. + */ + void onSocketOutboundBinary(ByteBuffer data) throws IOException; + } + + /** + * @return the WS sub-protocol that this transport is supplying. + */ + String getSubProtocol(); + + /** + * Called to provide the WS with the output data sink. + */ + void setTransportSink(WSTransportSink outputSink); + + /** + * Called from the WebSocket framework when new incoming String data is received. + * + * @param data + * The newly received incoming data. + * + * @throws IOException if an error occurs or the socket doesn't support text data. + */ + void onWebSocketText(String data) throws IOException; + + /** + * Called from the WebSocket framework when new incoming Binary data is received. + * + * @param data + * The newly received incoming data. + * + * @throws IOException if an error occurs or the socket doesn't support binary data. + */ + void onWebSocketBinary(ByteBuffer data) throws IOException; + + /** + * Called from the WebSocket framework when the socket has been closed unexpectedly. + * + * @throws IOException if an error while processing the close. + */ + void onWebSocketClosed() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml index 5b288c5..e15e076 100755 --- a/activemq-http/pom.xml +++ b/activemq-http/pom.xml @@ -228,7 +228,7 @@ </plugins> </build> </profile> - <profile> + <profile> <id>activemq.tests-autoTransport</id> <activation> <property> http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java index 4132c7c..fdf85b3 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java @@ -17,21 +17,22 @@ package org.apache.activemq.transport.http; import java.io.IOException; +import java.security.cert.X509Certificate; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.TransportSupport; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; /** * A server side HTTP based TransportChannel which processes incoming packets * and adds outgoing packets onto a {@link Queue} so that they can be dispatched * by the HTTP GET requests from the client. - * - * */ -public class BlockingQueueTransport extends TransportSupport { +public class BlockingQueueTransport extends TransportSupport { + public static final long MAX_TIMEOUT = 30000L; private BlockingQueue<Object> queue; @@ -44,6 +45,7 @@ public class BlockingQueueTransport extends TransportSupport { return queue; } + @Override public void oneway(Object command) throws IOException { try { boolean success = queue.offer(command, MAX_TIMEOUT, TimeUnit.MILLISECONDS); @@ -55,18 +57,35 @@ public class BlockingQueueTransport extends TransportSupport { } } - + @Override public String getRemoteAddress() { return "blockingQueue_" + queue.hashCode(); } + @Override protected void doStart() throws Exception { } + @Override protected void doStop(ServiceStopper stopper) throws Exception { } + @Override public int getReceiveCounter() { return 0; } + + @Override + public X509Certificate[] getPeerCertificates() { + return null; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + } + + @Override + public WireFormat getWireFormat() { + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java index c65dbb9..7f446c5 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java @@ -20,6 +20,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; +import java.security.cert.X509Certificate; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -30,6 +31,7 @@ import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.http.HttpRequest; @@ -396,4 +398,17 @@ public class HttpClientTransport extends HttpTransportSupport { this.minSendAsCompressedSize = minSendAsCompressedSize; } + @Override + public X509Certificate[] getPeerCertificates() { + return null; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + } + + @Override + public WireFormat getWireFormat() { + return getTextWireFormat(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java b/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java index 4daaf65..a8309b6 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java @@ -33,4 +33,25 @@ public class HttpTransportUtils { remoteAddress.append(request.getRemotePort()); return remoteAddress.toString(); } + + public static String generateWsRemoteAddress(HttpServletRequest request, String schemePrefix) { + if (request == null) { + throw new IllegalArgumentException("HttpServletRequest must not be null."); + } + + StringBuilder remoteAddress = new StringBuilder(); + String scheme = request.getScheme(); + if (scheme != null && scheme.equalsIgnoreCase("https")) { + scheme = schemePrefix + "+wss://"; + } else { + scheme = schemePrefix + "+ws://"; + } + + remoteAddress.append(scheme); + remoteAddress.append(request.getRemoteAddr()); + remoteAddress.append(":"); + remoteAddress.append(request.getRemotePort()); + + return remoteAddress.toString(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java index b8e0f8f..dd25a1d 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java @@ -147,7 +147,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St stompInactivityMonitor.onCommand(new KeepAliveInfo()); } else { StompFrame frame = (StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))); - frame.setTransportContext(getCertificates()); + frame.setTransportContext(getPeerCertificates()); protocolConverter.onStompCommand(frame); } } @@ -162,11 +162,13 @@ public abstract class AbstractStompSocket extends TransportSupport implements St return socketTransportStarted.getCount() == 0; } - public X509Certificate[] getCertificates() { + @Override + public X509Certificate[] getPeerCertificates() { return certificates; } - public void setCertificates(X509Certificate[] certificates) { + @Override + public void setPeerCertificates(X509Certificate[] certificates) { this.certificates = certificates; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java index 3ded98f..340505a 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java @@ -129,10 +129,6 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList } } - - /* (non-Javadoc) - * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String) - */ @Override public void onWebSocketClose(int statusCode, String reason) { LOG.trace("STOMP WS Connection closed, code:{} message:{}", statusCode, reason); @@ -140,15 +136,10 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList this.connection = null; this.closeCode = statusCode; this.closeMessage = reason; - } - /* (non-Javadoc) - * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session) - */ @Override - public void onWebSocketConnect( - org.eclipse.jetty.websocket.api.Session session) { + public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) { this.connection = session; this.connectLatch.countDown(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java index bfcb5df..744685b 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java @@ -23,6 +23,8 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.util.IOExceptionSupport; @@ -32,7 +34,9 @@ import org.apache.activemq.util.URISupport; /** * Factory for WebSocket (ws) transport */ -public class WSTransportFactory extends TransportFactory { +public class WSTransportFactory extends TransportFactory implements BrokerServiceAware { + + private BrokerService brokerService; @Override public TransportServer doBind(URI location) throws IOException { @@ -42,6 +46,7 @@ public class WSTransportFactory extends TransportFactory { Map<String, Object> httpOptions = IntrospectionSupport.extractProperties(options, "http."); Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, ""); IntrospectionSupport.setProperties(result, transportOptions); + result.setBrokerService(brokerService); result.setTransportOption(transportOptions); result.setHttpOptions(httpOptions); return result; @@ -49,4 +54,9 @@ public class WSTransportFactory extends TransportFactory { throw IOExceptionSupport.create(e); } } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java new file mode 100644 index 0000000..7d3ba18 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.cert.X509Certificate; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.ws.WSTransport.WSTransportSink; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A proxy class that manages sending WebSocket events to the wrapped protocol level + * WebSocket Transport. + */ +public final class WSTransportProxy extends TransportSupport implements Transport, WebSocketListener, BrokerServiceAware, WSTransportSink { + + private static final Logger LOG = LoggerFactory.getLogger(WSTransportProxy.class); + + private final int ORDERLY_CLOSE_TIMEOUT = 10; + + private final ReentrantLock protocolLock = new ReentrantLock(); + private final CountDownLatch socketTransportStarted = new CountDownLatch(1); + private final String remoteAddress; + + private final Transport transport; + private final WSTransport wsTransport; + private Session session; + + /** + * Create a WebSocket Transport Proxy instance that will pass + * along WebSocket event to the underlying protocol level transport. + * + * @param remoteAddress + * the provided remote address to report being connected to. + * @param transport + * The protocol level WebSocket Transport + */ + public WSTransportProxy(String remoteAddress, Transport transport) { + this.remoteAddress = remoteAddress; + this.transport = transport; + this.wsTransport = transport.narrow(WSTransport.class); + + if (wsTransport == null) { + throw new IllegalArgumentException("Provided Transport does not contains a WSTransport implementation"); + } else { + wsTransport.setTransportSink(this); + } + } + + /** + * @return the sub-protocol of the proxied transport. + */ + public String getSubProtocol() { + return wsTransport.getSubProtocol(); + } + + /** + * Apply any configure Transport options on the wrapped Transport and its contained + * wireFormat instance. + */ + public void setTransportOptions(Map<String, Object> options) { + Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat."); + + IntrospectionSupport.setProperties(transport, options); + IntrospectionSupport.setProperties(transport.getWireFormat(), wireFormatOptions); + } + + @Override + public void setBrokerService(BrokerService brokerService) { + if (transport instanceof BrokerServiceAware) { + ((BrokerServiceAware) transport).setBrokerService(brokerService); + } + } + + @Override + public void oneway(Object command) throws IOException { + protocolLock.lock(); + try { + transport.oneway(command); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); + } + } + + @Override + public X509Certificate[] getPeerCertificates() { + return transport.getPeerCertificates(); + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + transport.setPeerCertificates(certificates); + } + + @Override + public String getRemoteAddress() { + return remoteAddress; + } + + @Override + public WireFormat getWireFormat() { + return transport.getWireFormat(); + } + + @Override + public int getReceiveCounter() { + return transport.getReceiveCounter(); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + transport.stop(); + if (session != null && session.isOpen()) { + session.close(); + } + } + + @Override + protected void doStart() throws Exception { + socketTransportStarted.countDown(); + + transport.setTransportListener(getTransportListener()); + transport.start(); + } + + //----- WebSocket methods being proxied to the WS Transport --------------// + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int length) { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for WebSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + protocolLock.lock(); + try { + wsTransport.onWebSocketBinary(ByteBuffer.wrap(payload, offset, length)); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); + } + } + + @Override + public void onWebSocketText(String data) { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for WebSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + protocolLock.lock(); + try { + wsTransport.onWebSocketText(data); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } finally { + protocolLock.unlock(); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + try { + if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { + LOG.debug("WebSocket closed: code[{}] message[{}]", statusCode, reason); + wsTransport.onWebSocketClosed(); + } + } catch (Exception e) { + LOG.debug("Failed to close WebSocket cleanly", e); + } finally { + if (protocolLock.isHeldByCurrentThread()) { + protocolLock.unlock(); + } + } + } + + @Override + public void onWebSocketConnect(Session session) { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable cause) { + onException(IOExceptionSupport.create(cause)); + } + + @Override + public void onSocketOutboundText(String data) throws IOException { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for WebSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + LOG.trace("WS Proxy sending string of size {} out", data.length()); + session.getRemote().sendString(data); + } + + @Override + public void onSocketOutboundBinary(ByteBuffer data) throws IOException { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for WebSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + LOG.trace("WS Proxy sending {} bytes out", data.remaining()); + int limit = data.limit(); + session.getRemote().sendBytes(data); + + // Reset back to original limit and move position to match limit indicating + // that we read everything, the websocket sender clears the passed buffer + // which can make it look as if nothing was written. + data.limit(limit); + data.position(limit); + } + + //----- Internal implementation ------------------------------------------// + + private boolean transportStartedAtLeastOnce() { + return socketTransportStarted.getCount() == 0; + } +}
