https://issues.apache.org/jira/browse/AMQ-6339
Add support for AMQP client to connect using WebSockets. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/31c55f75 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/31c55f75 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/31c55f75 Branch: refs/heads/master Commit: 31c55f75108b06020eb6206c52361b04f49656a9 Parents: 83827f2 Author: Timothy Bish <[email protected]> Authored: Fri Jun 17 16:26:52 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Jun 30 14:52:40 2016 -0400 ---------------------------------------------------------------------- activemq-amqp/pom.xml | 45 ++ .../transport/amqp/AmqpProtocolConverter.java | 8 +- .../transport/amqp/AmqpWSTransport.java | 143 ++++++ .../transport/amqp/AmqpWSTransportFactory.java | 88 ++++ .../transport/amqp/protocol/AmqpConnection.java | 7 +- .../transport/amqp/sasl/AmqpAuthenticator.java | 4 +- .../org/apache/activemq/transport/amqp+ws | 17 + .../org/apache/activemq/transport/amqp+wss | 17 + .../transport/amqp/AmqpAndMqttTest.java | 137 ++++++ .../transport/amqp/AmqpTestSupport.java | 49 ++ .../transport/amqp/JMSClientSslTest.java | 13 - .../activemq/transport/amqp/JMSClientTest.java | 6 +- .../transport/amqp/client/AmqpClient.java | 2 +- .../amqp/client/AmqpClientTestSupport.java | 36 +- .../transport/amqp/client/AmqpConnection.java | 10 +- .../client/transport/NettyTcpTransport.java | 401 ++++++++++++++++ .../amqp/client/transport/NettyTransport.java | 370 +-------------- .../client/transport/NettyTransportFactory.java | 17 +- .../transport/NettyTransportSslOptions.java | 2 +- .../amqp/client/transport/NettyWSTransport.java | 470 +++++++++++++++++++ .../AmqpBrokerReuqestedHearbeatsTest.java | 18 + .../AmqpClientRequestsHeartbeatsTest.java | 18 + .../amqp/interop/AmqpConnectionsTest.java | 20 + .../src/test/resources/log4j.properties | 2 +- .../activemq/transport/vm/VMTransport.java | 17 + .../apache/activemq/transport/Transport.java | 25 + .../activemq/transport/TransportFilter.java | 42 +- .../transport/failover/FailoverTransport.java | 28 +- .../transport/fanout/FanoutTransport.java | 71 +-- .../activemq/transport/mock/MockTransport.java | 70 ++- .../activemq/transport/tcp/SslTransport.java | 11 +- .../activemq/transport/tcp/TcpTransport.java | 19 +- .../activemq/transport/udp/UdpTransport.java | 25 +- .../activemq/transport/ws/WSTransport.java | 95 ++++ activemq-http/pom.xml | 2 +- .../transport/http/BlockingQueueTransport.java | 27 +- .../transport/http/HttpClientTransport.java | 15 + .../transport/util/HttpTransportUtils.java | 21 + .../transport/ws/AbstractStompSocket.java | 8 +- .../transport/ws/StompWSConnection.java | 11 +- .../transport/ws/WSTransportFactory.java | 12 +- .../activemq/transport/ws/WSTransportProxy.java | 270 +++++++++++ .../transport/ws/WSTransportServer.java | 19 +- .../activemq/transport/ws/jetty9/WSServlet.java | 127 ++++- .../transport/wss/WSSTransportFactory.java | 12 +- .../activemq/transport/ws/MQTTWSConnection.java | 10 +- .../ws/StompWSConnectionTimeoutTest.java | 4 - .../transport/ws/WSTransportTestSupport.java | 16 +- .../src/test/resources/log4j.properties | 4 +- activemq-unit-tests/pom.xml | 4 - .../activemq/conversions/AmqpAndMqttTest.java | 136 ------ .../activemq/transport/StubTransport.java | 25 +- .../auto/AutoTransportConfigureTest.java | 30 +- pom.xml | 2 +- 54 files changed, 2362 insertions(+), 696 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml index 7c8320f..dba2dac 100644 --- a/activemq-amqp/pom.xml +++ b/activemq-amqp/pom.xml @@ -94,6 +94,21 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-http</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-mqtt</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty.websocket</groupId> + <artifactId>websocket-server</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <scope>test</scope> @@ -123,6 +138,36 @@ <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + <version>${netty-all-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + <version>${netty-all-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + <version>${netty-all-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + <version>${netty-all-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + <version>${netty-all-version}</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 6b9a178..9234bf5 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -22,13 +22,13 @@ import org.apache.activemq.command.Command; /** * Interface that defines the API for any AMQP protocol converter ised to - * map AMQP mechanincs to ActiveMQ and back. + * map AMQP mechanics to ActiveMQ and back. */ public interface AmqpProtocolConverter { /** * A new incoming data packet from the remote peer is handed off to the - * protocol converter for porcessing. The type can vary and be either an + * protocol converter for processing. The type can vary and be either an * AmqpHeader at the handshake phase or a byte buffer containing the next * incoming frame data from the remote. * @@ -70,9 +70,9 @@ public interface AmqpProtocolConverter { * empty frames or closing connections due to remote end being inactive * for to long. * - * @returns the amount of milliseconds to wait before performaing another check. + * @returns the amount of milliseconds to wait before performing another check. * - * @throws IOException if an error occurs on writing heatbeats to the wire. + * @throws IOException if an error occurs on writing heart-beats to the wire. */ long keepAlive() throws IOException; http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java new file mode 100644 index 0000000..2ec3a09 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.security.cert.X509Certificate; + +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.amqp.AmqpFrameParser.AMQPFrameSink; +import org.apache.activemq.transport.ws.WSTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; + +/** + * An AMQP based WebSocket transport implementation. + */ +public class AmqpWSTransport extends TransportSupport implements WSTransport, AMQPFrameSink { + + private final AmqpFrameParser frameReader = new AmqpFrameParser(this); + private final URI remoteLocation; + + private WSTransportSink outputSink; + private int receiveCounter; + private X509Certificate[] certificates; + + /** + * Create a new Transport instance. + * + * @param location + * the remote location where the client connection is from. + * @param wireFormat + * the WireFormat instance that configures this Transport. + */ + public AmqpWSTransport(URI location, WireFormat wireFormat) { + super(); + + remoteLocation = location; + frameReader.setWireFormat((AmqpWireFormat) wireFormat); + } + + @Override + public void setTransportSink(WSTransportSink outputSink) { + this.outputSink = outputSink; + } + + @Override + public void oneway(Object command) throws IOException { + if (command instanceof ByteBuffer) { + outputSink.onSocketOutboundBinary((ByteBuffer) command); + } else { + throw new IOException("Unexpected output command."); + } + } + + @Override + public String getRemoteAddress() { + return remoteLocation.toASCIIString(); + } + + @Override + public int getReceiveCounter() { + return receiveCounter; + } + + @Override + public X509Certificate[] getPeerCertificates() { + return certificates; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + this.certificates = certificates; + } + + @Override + public String getSubProtocol() { + return "amqp"; + } + + @Override + public WireFormat getWireFormat() { + return frameReader.getWireFormat(); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + // Currently nothing needed here since we have no async workers. + } + + @Override + protected void doStart() throws Exception { + if (outputSink == null) { + throw new IllegalStateException("Transport started before output sink assigned."); + } + + // Currently nothing needed here since we have no async workers. + } + + //----- WebSocket event hooks --------------------------------------------// + + @Override + public void onWebSocketText(String data) throws IOException { + onException(new IOException("Illegal text content receive on AMQP WebSocket channel.")); + } + + @Override + public void onWebSocketBinary(ByteBuffer data) throws IOException { + try { + frameReader.parse(data); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } + } + + @Override + public void onWebSocketClosed() throws IOException { + onException(new IOException("Unexpected close of AMQP WebSocket channel.")); + } + + //----- AMQP Frame Data event hook ---------------------------------------// + + @Override + public void onFrame(Object frame) { + doConsume(frame); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java new file mode 100644 index 0000000..b85a827 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.Map; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * Factory for creating WebSocket aware AMQP Transports. + */ +public class AmqpWSTransportFactory extends TransportFactory implements BrokerServiceAware { + + private BrokerService brokerService = null; + + @Override + protected String getDefaultWireFormatType() { + return "amqp"; + } + + @Override + public TransportServer doBind(URI location) throws IOException { + throw new IOException("doBind() method not implemented! No Server over WS implemented."); + } + + @Override + @SuppressWarnings("rawtypes") + public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { + AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService); + + Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat."); + + IntrospectionSupport.setProperties(amqpTransport, options); + IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions); + + // Now wrap the filter with the monitor + transport = createInactivityMonitor(amqpTransport, format); + IntrospectionSupport.setProperties(transport, options); + + return super.compositeConfigure(transport, format, options); + } + + /** + * Factory method to create a new transport + * + * @throws IOException + * @throws UnknownHostException + */ + @Override + protected Transport createTransport(URI location, WireFormat wireFormat) throws MalformedURLException, UnknownHostException, IOException { + return new AmqpWSTransport(location, wireFormat); + } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + protected Transport createInactivityMonitor(AmqpTransportFilter transport, WireFormat format) { + AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format); + transport.setInactivityMonitor(monitor); + return monitor; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 8b86132..aa9b577 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -309,7 +309,7 @@ public class AmqpConnection implements AmqpProtocolConverter { while (!done) { ByteBuffer toWrite = protonTransport.getOutputBuffer(); if (toWrite != null && toWrite.hasRemaining()) { - LOG.trace("Sending {} bytes out", toWrite.limit()); + LOG.trace("Server: Sending {} bytes out", toWrite.limit()); amqpTransport.sendToAmqp(toWrite); protonTransport.outputConsumed(); } else { @@ -356,6 +356,8 @@ public class AmqpConnection implements AmqpProtocolConverter { return; } + LOG.trace("Server: Received from client: {} bytes", frame.getLength()); + while (frame.length > 0) { try { int count = protonTransport.input(frame.data, frame.offset, frame.length); @@ -386,7 +388,7 @@ public class AmqpConnection implements AmqpProtocolConverter { Event event = null; while ((event = eventCollector.peek()) != null) { if (amqpTransport.isTrace()) { - LOG.trace("Processing event: {}", event.getType()); + LOG.trace("Server: Processing event: {}", event.getType()); } switch (event.getType()) { case CONNECTION_REMOTE_OPEN: @@ -484,7 +486,6 @@ public class AmqpConnection implements AmqpProtocolConverter { protonConnection.close(); } else { - if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) { LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout()); protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout()); http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java index 379c5f9..2309374 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java @@ -54,14 +54,14 @@ public class AmqpAuthenticator { } /** - * @return true if the SASL exchange has conpleted, regardless of success. + * @return true if the SASL exchange has completed, regardless of success. */ public boolean isDone() { return sasl.getOutcome() != Sasl.SaslOutcome.PN_SASL_NONE; } /** - * @return the list of all SASL mechanisms that are supported curretnly. + * @return the list of all SASL mechanisms that are supported currently. */ public String[] getSupportedMechanisms() { return mechanisms; http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws new file mode 100644 index 0000000..cfadf68 --- /dev/null +++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss new file mode 100644 index 0000000..cfadf68 --- /dev/null +++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java new file mode 100644 index 0000000..ac7a199 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AmqpAndMqttTest { + + protected BrokerService broker; + private TransportConnector amqpConnector; + private TransportConnector mqttConnector; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.setSchedulerSupport(false); + + amqpConnector = broker.addConnector("amqp://0.0.0.0:0"); + mqttConnector = broker.addConnector("mqtt://0.0.0.0:0"); + + return broker; + } + + @Test(timeout = 60000) + public void testFromMqttToAmqp() throws Exception { + Connection amqp = createAmqpConnection(); + Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO")); + + final BlockingConnection mqtt = createMQTTConnection().blockingConnection(); + mqtt.connect(); + byte[] payload = bytes("Hello World"); + mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false); + mqtt.disconnect(); + + Message msg = consumer.receive(1000 * 5); + assertNotNull(msg); + assertTrue(msg instanceof BytesMessage); + + BytesMessage bmsg = (BytesMessage) msg; + byte[] actual = new byte[(int) bmsg.getBodyLength()]; + bmsg.readBytes(actual); + assertTrue(Arrays.equals(actual, payload)); + amqp.close(); + } + + private byte[] bytes(String value) { + try { + return value.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + protected MQTT createMQTTConnection() throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setConnectAttemptsMax(1); + mqtt.setReconnectAttemptsMax(0); + mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort()); + return mqtt; + } + + public Connection createAmqpConnection() throws Exception { + + String amqpURI = "amqp://localhost:" + amqpConnector.getConnectUri().getPort(); + + final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI); + + factory.setUsername("admin"); + factory.setPassword("password"); + + final Connection connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + connection.start(); + return connection; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index af6a63f..12c0a17 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp; import java.io.File; import java.io.IOException; +import java.net.ServerSocket; import java.net.URI; import java.security.SecureRandom; import java.util.Set; @@ -33,6 +34,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.net.ServerSocketFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -79,6 +81,11 @@ public class AmqpTestSupport { protected URI amqpNioPlusSslURI; protected int amqpNioPlusSslPort; + protected URI amqpWsURI; + protected int amqpWsPort; + protected URI amqpWssURI; + protected int amqpWssPort; + protected URI autoURI; protected int autoPort; protected URI autoSslURI; @@ -213,6 +220,20 @@ public class AmqpTestSupport { autoNioPlusSslURI = connector.getPublishableConnectURI(); LOG.debug("Using auto+nio+ssl port " + autoNioPlusSslPort); } + if (isUseWsConnector()) { + connector = brokerService.addConnector( + "ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + amqpWsPort = connector.getConnectUri().getPort(); + amqpWsURI = connector.getPublishableConnectURI(); + LOG.debug("Using amqp+ws port " + amqpWsPort); + } + if (isUseWssConnector()) { + connector = brokerService.addConnector( + "wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + amqpWssPort = connector.getConnectUri().getPort(); + amqpWssURI = connector.getPublishableConnectURI(); + LOG.debug("Using amqp+wss port " + amqpWssPort); + } } protected boolean isPersistent() { @@ -263,6 +284,14 @@ public class AmqpTestSupport { return false; } + protected boolean isUseWsConnector() { + return false; + } + + protected boolean isUseWssConnector() { + return false; + } + protected String getAmqpTransformer() { return "jms"; } @@ -355,6 +384,26 @@ public class AmqpTestSupport { return name.getMethodName(); } + protected int getProxyPort(int proxyPort) { + if (proxyPort == 0) { + ServerSocket ss = null; + try { + ss = ServerSocketFactory.getDefault().createServerSocket(0); + proxyPort = ss.getLocalPort(); + } catch (IOException e) { // ignore + } finally { + try { + if (ss != null ) { + ss.close(); + } + } catch (IOException e) { // ignore + } + } + } + + return proxyPort; + } + protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException { ObjectName brokerViewMBean = new ObjectName( "org.apache.activemq:type=Broker,brokerName=localhost"); http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java index a8acb7d..b0bd3a7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java @@ -17,13 +17,7 @@ package org.apache.activemq.transport.amqp; import java.net.URI; -import java.security.SecureRandom; -import javax.net.ssl.KeyManager; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; - -import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,13 +27,6 @@ import org.slf4j.LoggerFactory; public class JMSClientSslTest extends JMSClientTest { protected static final Logger LOG = LoggerFactory.getLogger(JMSClientSslTest.class); - @BeforeClass - public static void beforeClass() throws Exception { - SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); - SSLContext.setDefault(ctx); - } - @Override protected URI getBrokerURI() { return amqpSslURI; http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 647ff13..87dc9ee 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -57,7 +57,7 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; import org.apache.activemq.util.Wait; -import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Test; import org.objectweb.jtests.jms.framework.TestConfig; import org.slf4j.Logger; @@ -1180,8 +1180,8 @@ public class JMSClientTest extends JMSClientTestSupport { @Test(timeout = 60000) public void testZeroPrefetchWithTwoConsumers() throws Exception { - connection = createConnection(); - ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0")); + connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index 738b0eb..78b1aa0 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -21,8 +21,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.activemq.transport.amqp.client.transport.NettyTransport; import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; +import org.apache.activemq.transport.amqp.client.transport.NettyTransport; import org.apache.qpid.proton.amqp.Symbol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java index 5504954..123f86c 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java @@ -51,12 +51,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport { @Override protected boolean isUseTcpConnector() { - return !isUseSSL() && !connectorScheme.contains("nio"); + return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws"); } @Override protected boolean isUseSslConnector() { - return isUseSSL() && !connectorScheme.contains("nio"); + return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss"); } @Override @@ -69,13 +69,33 @@ public class AmqpClientTestSupport extends AmqpTestSupport { return isUseSSL() && connectorScheme.contains("nio"); } + @Override + protected boolean isUseWsConnector() { + return !isUseSSL() && connectorScheme.contains("ws"); + } + + @Override + protected boolean isUseWssConnector() { + return isUseSSL() && connectorScheme.contains("wss"); + } + public URI getBrokerAmqpConnectionURI() { + boolean webSocket = false; + try { int port = 0; switch (connectorScheme) { case "amqp": port = this.amqpPort; break; + case "amqp+ws": + port = this.amqpWsPort; + webSocket = true; + break; + case "amqp+wss": + port = this.amqpWssPort; + webSocket = true; + break; case "amqp+ssl": port = this.amqpSslPort; break; @@ -92,9 +112,17 @@ public class AmqpClientTestSupport extends AmqpTestSupport { String uri = null; if (isUseSSL()) { - uri = "ssl://127.0.0.1:" + port; + if (webSocket) { + uri = "wss://127.0.0.1:" + port; + } else { + uri = "ssl://127.0.0.1:" + port; + } } else { - uri = "tcp://127.0.0.1:" + port; + if (webSocket) { + uri = "ws://127.0.0.1:" + port; + } else { + uri = "tcp://127.0.0.1:" + port; + } } if (!getAmqpConnectionURIOptions().isEmpty()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index acda553..85a1d22 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; -import org.apache.activemq.transport.amqp.client.transport.NettyTransport; import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; @@ -79,7 +78,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements private final AtomicLong sessionIdGenerator = new AtomicLong(); private final AtomicLong txIdGenerator = new AtomicLong(); private final Collector protonCollector = new CollectorImpl(); - private final NettyTransport transport; + private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport; private final Transport protonTransport = Transport.Factory.create(); private final String username; @@ -103,7 +102,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; private long drainTimeout = DEFAULT_DRAIN_TIMEOUT; - public AmqpConnection(NettyTransport transport, String username, String password) { + public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) { setEndpoint(Connection.Factory.create()); getEndpoint().collect(protonCollector); @@ -490,7 +489,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements @Override public void run() { ByteBuffer source = incoming.nioBuffer(); - LOG.trace("Received from Broker {} bytes:", source.remaining()); + LOG.trace("Client Received from Broker {} bytes:", source.remaining()); if (protonTransport.isClosed()) { LOG.debug("Ignoring incoming data because transport is closed"); @@ -520,6 +519,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements @Override public void onTransportClosed() { LOG.debug("The transport has unexpectedly closed"); + failed(getOpenAbortException()); } @Override @@ -612,7 +612,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements Event protonEvent = null; while ((protonEvent = protonCollector.peek()) != null) { if (!protonEvent.getType().equals(Type.TRANSPORT)) { - LOG.trace("New Proton Event: {}", protonEvent.getType()); + LOG.trace("Client: New Proton Event: {}", protonEvent.getType()); } AmqpEventSink amqpEventSink = null; http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java new file mode 100644 index 0000000..886ed4b --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java @@ -0,0 +1,401 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.io.IOException; +import java.net.URI; +import java.security.Principal; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +/** + * TCP based transport that uses Netty as the underlying IO layer. + */ +public class NettyTcpTransport implements NettyTransport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); + + private static final int QUIET_PERIOD = 20; + private static final int SHUTDOWN_TIMEOUT = 100; + + protected Bootstrap bootstrap; + protected EventLoopGroup group; + protected Channel channel; + protected NettyTransportListener listener; + protected NettyTransportOptions options; + protected final URI remote; + protected boolean secure; + + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final CountDownLatch connectLatch = new CountDownLatch(1); + private IOException failureCause; + private Throwable pendingFailure; + + /** + * Create a new transport instance + * + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + this.options = options; + this.listener = listener; + this.remote = remoteLocation; + this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl"); + } + + @Override + public void connect() throws IOException { + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + group = new NioEventLoopGroup(1); + + bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer<Channel>() { + + @Override + public void initChannel(Channel connectedChannel) throws Exception { + configureChannel(connectedChannel); + } + }); + + configureNetty(bootstrap, getTransportOptions()); + + ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort()); + future.addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + handleConnected(future.channel()); + } else if (future.isCancelled()) { + connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); + } else { + connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); + } + } + }); + + try { + connectLatch.await(); + } catch (InterruptedException ex) { + LOG.debug("Transport connection was interrupted."); + Thread.interrupted(); + failureCause = IOExceptionSupport.create(ex); + } + + if (failureCause != null) { + // Close out any Netty resources now as they are no longer needed. + if (channel != null) { + channel.close().syncUninterruptibly(); + channel = null; + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + group = null; + } + + throw failureCause; + } else { + // Connected, allow any held async error to fire now and close the transport. + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + if (pendingFailure != null) { + channel.pipeline().fireExceptionCaught(pendingFailure); + } + } + }); + } + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public boolean isSSL() { + return secure; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + connected.set(false); + if (channel != null) { + channel.close().syncUninterruptibly(); + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } + } + } + + @Override + public ByteBuf allocateSendBuffer(int size) throws IOException { + checkConnected(); + return channel.alloc().ioBuffer(size, size); + } + + @Override + public void send(ByteBuf output) throws IOException { + checkConnected(); + int length = output.readableBytes(); + if (length == 0) { + return; + } + + LOG.trace("Attempted write of: {} bytes", length); + + channel.writeAndFlush(output); + } + + @Override + public NettyTransportListener getTransportListener() { + return listener; + } + + @Override + public void setTransportListener(NettyTransportListener listener) { + this.listener = listener; + } + + @Override + public NettyTransportOptions getTransportOptions() { + if (options == null) { + if (isSSL()) { + options = NettyTransportSslOptions.INSTANCE; + } else { + options = NettyTransportOptions.INSTANCE; + } + } + + return options; + } + + @Override + public URI getRemoteLocation() { + return remote; + } + + @Override + public Principal getLocalPrincipal() { + if (!isSSL()) { + throw new UnsupportedOperationException("Not connected to a secure channel"); + } + + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + + return sslHandler.engine().getSession().getLocalPrincipal(); + } + + //----- Internal implementation details, can be overridden as needed --// + + protected String getRemoteHost() { + return remote.getHost(); + } + + protected int getRemotePort() { + int port = remote.getPort(); + + if (port <= 0) { + if (isSSL()) { + port = getSslOptions().getDefaultSslPort(); + } else { + port = getTransportOptions().getDefaultTcpPort(); + } + } + + return port; + } + + protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); + bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + + if (options.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); + } + + if (options.getReceiveBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); + } + + if (options.getTrafficClass() != -1) { + bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); + } + } + + protected void configureChannel(final Channel channel) throws Exception { + if (isSSL()) { + SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { + @Override + public void operationComplete(Future<Channel> future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + connectionEstablished(channel); + } else { + LOG.trace("SSL Handshake has failed: {}", channel); + connectionFailed(channel, IOExceptionSupport.create(future.cause())); + } + } + }); + + channel.pipeline().addLast(sslHandler); + } + + channel.pipeline().addLast(new NettyTcpTransportHandler()); + } + + protected void handleConnected(final Channel channel) throws Exception { + if (!isSSL()) { + connectionEstablished(channel); + } + } + + //----- State change handlers and checks ---------------------------------// + + /** + * Called when the transport has successfully connected and is ready for use. + */ + protected void connectionEstablished(Channel connectedChannel) { + channel = connectedChannel; + connected.set(true); + connectLatch.countDown(); + } + + /** + * Called when the transport connection failed and an error should be returned. + * + * @param failedChannel + * The Channel instance that failed. + * @param cause + * An IOException that describes the cause of the failed connection. + */ + protected void connectionFailed(Channel failedChannel, IOException cause) { + failureCause = IOExceptionSupport.create(cause); + channel = failedChannel; + connected.set(false); + connectLatch.countDown(); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + //----- Handle connection events -----------------------------------------// + + private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> { + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has become active! Channel is {}", context.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {}", context.channel()); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (pendingFailure != null) { + listener.onTransportError(pendingFailure); + } else { + listener.onTransportError(cause); + } + } else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (pendingFailure != null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + pendingFailure = cause; + } + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer); + listener.onData(buffer); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java index 4084780..48be3a2 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,375 +16,37 @@ */ package org.apache.activemq.transport.amqp.client.transport; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; - import java.io.IOException; import java.net.URI; import java.security.Principal; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; /** - * TCP based transport that uses Netty as the underlying IO layer. + * */ -public class NettyTransport { - - private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class); - - private static final int QUIET_PERIOD = 20; - private static final int SHUTDOWN_TIMEOUT = 100; - - protected Bootstrap bootstrap; - protected EventLoopGroup group; - protected Channel channel; - protected NettyTransportListener listener; - protected NettyTransportOptions options; - protected final URI remote; - protected boolean secure; - - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - private final CountDownLatch connectLatch = new CountDownLatch(1); - private IOException failureCause; - private Throwable pendingFailure; - - /** - * Create a new transport instance - * - * @param remoteLocation - * the URI that defines the remote resource to connect to. - * @param options - * the transport options used to configure the socket connection. - */ - public NettyTransport(URI remoteLocation, NettyTransportOptions options) { - this(null, remoteLocation, options); - } - - /** - * Create a new transport instance - * - * @param listener - * the TransportListener that will receive events from this Transport. - * @param remoteLocation - * the URI that defines the remote resource to connect to. - * @param options - * the transport options used to configure the socket connection. - */ - public NettyTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { - this.options = options; - this.listener = listener; - this.remote = remoteLocation; - this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl"); - } - - public void connect() throws IOException { - - if (listener == null) { - throw new IllegalStateException("A transport listener must be set before connection attempts."); - } - - group = new NioEventLoopGroup(1); - - bootstrap = new Bootstrap(); - bootstrap.group(group); - bootstrap.channel(NioSocketChannel.class); - bootstrap.handler(new ChannelInitializer<Channel>() { - - @Override - public void initChannel(Channel connectedChannel) throws Exception { - configureChannel(connectedChannel); - } - }); - - configureNetty(bootstrap, getTransportOptions()); - - ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort()); - future.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - handleConnected(future.channel()); - } else if (future.isCancelled()) { - connectionFailed(future.channel(), new IOException("Connection attempt was cancelled")); - } else { - connectionFailed(future.channel(), IOExceptionSupport.create(future.cause())); - } - } - }); - - try { - connectLatch.await(); - } catch (InterruptedException ex) { - LOG.debug("Transport connection was interrupted."); - Thread.interrupted(); - failureCause = IOExceptionSupport.create(ex); - } - - if (failureCause != null) { - // Close out any Netty resources now as they are no longer needed. - if (channel != null) { - channel.close().syncUninterruptibly(); - channel = null; - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - group = null; - } - - throw failureCause; - } else { - // Connected, allow any held async error to fire now and close the transport. - channel.eventLoop().execute(new Runnable() { - - @Override - public void run() { - if (pendingFailure != null) { - channel.pipeline().fireExceptionCaught(pendingFailure); - } - } - }); - } - } - - public boolean isConnected() { - return connected.get(); - } - - public boolean isSSL() { - return secure; - } - - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - connected.set(false); - if (channel != null) { - channel.close().syncUninterruptibly(); - } - if (group != null) { - group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - } - } - } - - public ByteBuf allocateSendBuffer(int size) throws IOException { - checkConnected(); - return channel.alloc().ioBuffer(size, size); - } - - public void send(ByteBuf output) throws IOException { - checkConnected(); - int length = output.readableBytes(); - if (length == 0) { - return; - } - - LOG.trace("Attempted write of: {} bytes", length); - - channel.writeAndFlush(output); - } - - public NettyTransportListener getTransportListener() { - return listener; - } - - public void setTransportListener(NettyTransportListener listener) { - this.listener = listener; - } - - public NettyTransportOptions getTransportOptions() { - if (options == null) { - if (isSSL()) { - options = NettyTransportSslOptions.INSTANCE; - } else { - options = NettyTransportOptions.INSTANCE; - } - } - - return options; - } - - public URI getRemoteLocation() { - return remote; - } - - public Principal getLocalPrincipal() { - if (!isSSL()) { - throw new UnsupportedOperationException("Not connected to a secure channel"); - } - - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - - return sslHandler.engine().getSession().getLocalPrincipal(); - } - - //----- Internal implementation details, can be overridden as needed --// - - protected String getRemoteHost() { - return remote.getHost(); - } - - protected int getRemotePort() { - int port = remote.getPort(); - - if (port <= 0) { - if (isSSL()) { - port = getSslOptions().getDefaultSslPort(); - } else { - port = getTransportOptions().getDefaultTcpPort(); - } - } - - return port; - } - - protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { - bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); - bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); - bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); - bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); - - if (options.getSendBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); - } - - if (options.getReceiveBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); - bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); - } - - if (options.getTrafficClass() != -1) { - bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); - } - } - - protected void configureChannel(final Channel channel) throws Exception { - if (isSSL()) { - SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); - sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { - @Override - public void operationComplete(Future<Channel> future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - connectionEstablished(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - connectionFailed(channel, IOExceptionSupport.create(future.cause())); - } - } - }); - - channel.pipeline().addLast(sslHandler); - } - - channel.pipeline().addLast(new NettyTcpTransportHandler()); - } +public interface NettyTransport { - protected void handleConnected(final Channel channel) throws Exception { - if (!isSSL()) { - connectionEstablished(channel); - } - } + void connect() throws IOException; - //----- State change handlers and checks ---------------------------------// + boolean isConnected(); - /** - * Called when the transport has successfully connected and is ready for use. - */ - protected void connectionEstablished(Channel connectedChannel) { - channel = connectedChannel; - connected.set(true); - connectLatch.countDown(); - } + boolean isSSL(); - /** - * Called when the transport connection failed and an error should be returned. - * - * @param failedChannel - * The Channel instance that failed. - * @param cause - * An IOException that describes the cause of the failed connection. - */ - protected void connectionFailed(Channel failedChannel, IOException cause) { - failureCause = IOExceptionSupport.create(cause); - channel = failedChannel; - connected.set(false); - connectLatch.countDown(); - } + void close() throws IOException; - private NettyTransportSslOptions getSslOptions() { - return (NettyTransportSslOptions) getTransportOptions(); - } + ByteBuf allocateSendBuffer(int size) throws IOException; - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } - } + void send(ByteBuf output) throws IOException; - //----- Handle connection events -----------------------------------------// + NettyTransportListener getTransportListener(); - private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> { + void setTransportListener(NettyTransportListener listener); - @Override - public void channelActive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has become active! Channel is {}", context.channel()); - } + NettyTransportOptions getTransportOptions(); - @Override - public void channelInactive(ChannelHandlerContext context) throws Exception { - LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportClosed listener"); - listener.onTransportClosed(); - } - } + URI getRemoteLocation(); - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - LOG.trace("Exception on channel! Channel is {}", context.channel()); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportError listener"); - if (pendingFailure != null) { - listener.onTransportError(pendingFailure); - } else { - listener.onTransportError(cause); - } - } else { - // Hold the first failure for later dispatch if connect succeeds. - // This will then trigger disconnect using the first error reported. - if (pendingFailure != null) { - LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - pendingFailure = cause; - } - } - } + Principal getLocalPrincipal(); - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { - LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer); - listener.onData(buffer); - } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java index fd50890..cc47aa2 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java @@ -46,7 +46,7 @@ public final class NettyTransportFactory { remoteURI = PropertyUtil.replaceQuery(remoteURI, map); - if (!remoteURI.getScheme().equalsIgnoreCase("ssl")) { + if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) { transportOptions = NettyTransportOptions.INSTANCE.clone(); } else { transportOptions = NettyTransportSslOptions.INSTANCE.clone(); @@ -61,7 +61,20 @@ public final class NettyTransportFactory { throw new IllegalArgumentException(msg); } - NettyTransport result = new NettyTransport(remoteURI, transportOptions); + NettyTransport result = null; + + switch (remoteURI.getScheme().toLowerCase()) { + case "tcp": + case "ssl": + result = new NettyTcpTransport(remoteURI, transportOptions); + break; + case "ws": + case "wss": + result = new NettyWSTransport(remoteURI, transportOptions); + break; + default: + throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme()); + } return result; } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java index 92ffd3c..b01f884 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java @@ -30,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions { public static final String DEFAULT_STORE_TYPE = "jks"; public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS"; public static final boolean DEFAULT_TRUST_ALL = false; - public static final boolean DEFAULT_VERIFY_HOST = true; + public static final boolean DEFAULT_VERIFY_HOST = false; public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"})); public static final int DEFAULT_SSL_PORT = 5671;
