Repository: activemq Updated Branches: refs/heads/trunk bc2e2d9a4 -> 3f8262507
[AMQ-5517] Runtime support for Jetty 9. Build/compile with Jetty8, but tests pass with Jetty 9 for runtime level support. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3f826250 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3f826250 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3f826250 Branch: refs/heads/trunk Commit: 3f826250775c6c022ebad1de57db11770c9cfe1a Parents: bc2e2d9 Author: Daniel Kulp <[email protected]> Authored: Tue Jan 13 12:47:53 2015 -0500 Committer: Daniel Kulp <[email protected]> Committed: Tue Jan 13 12:47:53 2015 -0500 ---------------------------------------------------------------------- activemq-http/pom.xml | 7 + .../transport/SecureSocketConnectorFactory.java | 131 ++++++++------- .../transport/SocketConnectorFactory.java | 17 +- .../transport/WebTransportServerSupport.java | 20 ++- .../discovery/http/EmbeddedJettyServer.java | 22 +-- .../transport/http/HttpTransportServer.java | 25 ++- .../https/Krb5AndCertsSslSocketConnector.java | 8 + .../activemq/transport/ws/MQTTSocket.java | 150 ----------------- .../activemq/transport/ws/StompSocket.java | 134 --------------- .../apache/activemq/transport/ws/WSServlet.java | 62 ------- .../transport/ws/WSTransportServer.java | 18 ++- .../transport/ws/jetty8/MQTTSocket.java | 150 +++++++++++++++++ .../transport/ws/jetty8/StompSocket.java | 134 +++++++++++++++ .../activemq/transport/ws/jetty8/WSServlet.java | 62 +++++++ .../transport/ws/jetty9/MQTTSocket.java | 161 +++++++++++++++++++ .../transport/ws/jetty9/StompSocket.java | 142 ++++++++++++++++ .../activemq/transport/ws/jetty9/WSServlet.java | 72 +++++++++ .../activemq/transport/ws/WSTransportTest.java | 16 +- .../transport/wss/WSSTransportTest.java | 22 +-- activemq-osgi/pom.xml | 2 +- activemq-web-console/pom.xml | 3 +- 21 files changed, 905 insertions(+), 453 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml index d1b6c02..96a22e1 100755 --- a/activemq-http/pom.xml +++ b/activemq-http/pom.xml @@ -108,6 +108,13 @@ <version>2.25.0</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.eclipse.jetty.websocket</groupId> + <artifactId>websocket-server</artifactId> + <version>${jetty9-version}</version> + <scope>provided</scope> + <optional>true</optional> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java index 6c98cac..3ac922a 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/SecureSocketConnectorFactory.java @@ -23,8 +23,6 @@ import org.apache.activemq.transport.https.Krb5AndCertsSslSocketConnector; import org.apache.activemq.util.IntrospectionSupport; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ssl.SslConnector; -import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; import org.eclipse.jetty.util.ssl.SslContextFactory; public class SecureSocketConnectorFactory extends SocketConnectorFactory { @@ -44,76 +42,101 @@ public class SecureSocketConnectorFactory extends SocketConnectorFactory { private String auth; private SslContext context; - + private SslContextFactory contextFactory; + + public SecureSocketConnectorFactory() { + + } public SecureSocketConnectorFactory(SslContext context) { this.context = context; } + public SecureSocketConnectorFactory(SslContextFactory contextFactory) { + this.contextFactory = contextFactory; + } + @Override public Connector createConnector(Server server) throws Exception { - IntrospectionSupport.setProperties(this, getTransportOptions()); - SslConnector sslConnector; - if (Krb5AndCertsSslSocketConnector.isKrb(auth)) { - sslConnector = new Krb5AndCertsSslSocketConnector(); - ((Krb5AndCertsSslSocketConnector)sslConnector).setMode(auth); - } else { - sslConnector = new SslSelectChannelConnector(); + if (getTransportOptions() != null) { + IntrospectionSupport.setProperties(this, getTransportOptions()); } SSLContext sslContext = context == null ? null : context.getSSLContext(); // Get a reference to the current ssl context factory... - SslContextFactory factory = sslConnector.getSslContextFactory(); - - if (context != null) { - - // Should not be using this method since it does not use all of the values - // from the passed SslContext instance..... - factory.setSslContext(sslContext); + SslContextFactory factory; + if (contextFactory == null) { + factory = new SslContextFactory(); + if (context != null) { + // Should not be using this method since it does not use all of the values + // from the passed SslContext instance..... + factory.setSslContext(sslContext); + + } else { + if (keyStore != null) { + factory.setKeyStorePath(keyStore); + } + if (keyStorePassword != null) { + factory.setKeyStorePassword(keyStorePassword); + } + // if the keyPassword hasn't been set, default it to the + // key store password + if (keyPassword == null && keyStorePassword != null) { + factory.setKeyStorePassword(keyStorePassword); + } + if (keyStoreType != null) { + factory.setKeyStoreType(keyStoreType); + } + if (secureRandomCertficateAlgorithm != null) { + factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm); + } + if (keyCertificateAlgorithm != null) { + factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm); + } + if (trustCertificateAlgorithm != null) { + factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm); + } + if (protocol != null) { + factory.setProtocol(protocol); + } + if (trustStore != null) { + setTrustStore(factory, trustStore); + } + if (trustStorePassword != null) { + factory.setTrustStorePassword(trustStorePassword); + } + } + factory.setNeedClientAuth(needClientAuth); + factory.setWantClientAuth(wantClientAuth); } else { + factory = contextFactory; + } - if (keyStore != null) { - factory.setKeyStorePath(keyStore); - } - if (keyStorePassword != null) { - factory.setKeyStorePassword(keyStorePassword); - } - // if the keyPassword hasn't been set, default it to the - // key store password - if (keyPassword == null && keyStorePassword != null) { - factory.setKeyStorePassword(keyStorePassword); - } - if (keyStoreType != null) { - factory.setKeyStoreType(keyStoreType); - } - if (secureRandomCertficateAlgorithm != null) { - factory.setSecureRandomAlgorithm(secureRandomCertficateAlgorithm); - } - if (keyCertificateAlgorithm != null) { - factory.setSslKeyManagerFactoryAlgorithm(keyCertificateAlgorithm); - } - if (trustCertificateAlgorithm != null) { - factory.setTrustManagerFactoryAlgorithm(trustCertificateAlgorithm); - } - if (protocol != null) { - factory.setProtocol(protocol); - } - if (trustStore != null) { - factory.setTrustStore(trustStore); - } - if (trustStorePassword != null) { - factory.setTrustStorePassword(trustStorePassword); + + if ("KRB".equals(auth) || "BOTH".equals(auth) + && Server.getVersion().startsWith("8")) { + return new Krb5AndCertsSslSocketConnector(factory, auth); + } else { + try { + Class<?> cls = Class.forName("org.eclipse.jetty.server.ssl.SslSelectChannelConnector", true, Server.class.getClassLoader()); + return (Connector)cls.getConstructor(SslContextFactory.class).newInstance(factory); + } catch (Throwable t) { + Class<?> c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader()); + Connector connector = (Connector)c.getConstructor(Server.class, SslContextFactory.class).newInstance(server, factory); + Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500); + connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500); + return connector; } - } - - factory.setNeedClientAuth(needClientAuth); - factory.setWantClientAuth(wantClientAuth); - - return sslConnector; } + private void setTrustStore(SslContextFactory factory, String trustStore2) throws Exception { + String mname = Server.getVersion().startsWith("8") ? "setTrustStore" : "setTrustStorePath"; + factory.getClass().getMethod(mname, String.class).invoke(factory, trustStore2); + } + + // Properties // -------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java index 36b800b..b982f18 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/SocketConnectorFactory.java @@ -21,15 +21,26 @@ import java.util.Map; import org.apache.activemq.util.IntrospectionSupport; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; public class SocketConnectorFactory { private Map<String, Object> transportOptions; public Connector createConnector(Server server) throws Exception { - SelectChannelConnector connector = new SelectChannelConnector(); - IntrospectionSupport.setProperties(connector, transportOptions, ""); + Connector connector = null; + + try { + connector = (Connector)Class.forName("org.eclipse.jetty.server.nio.SelectChannelConnector", true, Server.class.getClassLoader()).newInstance(); + } catch (Throwable t) { + Class<?> c = Class.forName("org.eclipse.jetty.server.ServerConnector", true, Server.class.getClassLoader()); + connector = (Connector)c.getConstructor(Server.class).newInstance(server); + Server.class.getMethod("setStopTimeout", Long.TYPE).invoke(server, 500); + connector.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(connector, 500); + } + System.out.println(transportOptions); + if (transportOptions != null) { + IntrospectionSupport.setProperties(connector, transportOptions, ""); + } return connector; } http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java index 28c11a6..a52424e 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/WebTransportServerSupport.java @@ -35,6 +35,18 @@ abstract public class WebTransportServerSupport extends TransportServerSupport { super(location); } + private <T> void setConnectorProperty(String name, Class<T> type, T value) throws Exception { + connector.getClass().getMethod("set" + name, type).invoke(connector, value); + } + + protected void createServer() { + server = new Server(); + try { + server.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(server, 500l); + } catch (Throwable t) { + //ignore, jetty 8. + } + } public URI bind() throws Exception { URI bind = getBindLocation(); @@ -44,9 +56,11 @@ abstract public class WebTransportServerSupport extends TransportServerSupport { InetAddress addr = InetAddress.getByName(bindHost); host = addr.getCanonicalHostName(); - connector.setHost(host); - connector.setPort(bindAddress.getPort()); - connector.setServer(server); + setConnectorProperty("Host", String.class, host); + setConnectorProperty("Port", Integer.TYPE, bindAddress.getPort()); + if (Server.getVersion().startsWith("8")) { + connector.setServer(server); + } server.addConnector(connector); if (addr.isAnyLocalAddress()) { host = InetAddressUtil.getLocalHostName(); http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java index 96389ec..786e3aa 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/EmbeddedJettyServer.java @@ -19,7 +19,6 @@ package org.apache.activemq.transport.discovery.http; import java.net.URI; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -27,13 +26,16 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service { private HTTPDiscoveryAgent agent; private Server server; - private SelectChannelConnector connector; private DiscoveryRegistryServlet camelServlet = new DiscoveryRegistryServlet(); public void start() throws Exception { URI uri = new URI(agent.getRegistryURL()); - server = new Server(); + int port = 80; + if( uri.getPort() >=0 ) { + port = uri.getPort(); + } + server = new Server(port); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS); context.setContextPath("/"); @@ -42,23 +44,9 @@ public class EmbeddedJettyServer implements org.apache.activemq.Service { context.addServlet(holder, "/*"); server.setHandler(context); server.start(); - - int port = 80; - if( uri.getPort() >=0 ) { - port = uri.getPort(); - } - - connector = new SelectChannelConnector(); - connector.setPort(port); - server.addConnector(connector); - connector.start(); } public void stop() throws Exception { - if( connector!=null ) { - connector.stop(); - connector = null; - } if( server!=null ) { server.stop(); server = null; http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java index 0c7ecd9..8ae7874 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportServer.java @@ -27,8 +27,8 @@ import org.apache.activemq.transport.util.TextWireFormat; import org.apache.activemq.transport.xstream.XStreamWireFormat; import org.apache.activemq.util.ServiceStopper; import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.GzipHandler; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.slf4j.Logger; @@ -77,7 +77,7 @@ public class HttpTransportServer extends WebTransportServerSupport { @Override protected void doStart() throws Exception { - server = new Server(); + createServer(); if (connector == null) { connector = socketConnectorFactory.createConnector(server); } @@ -96,8 +96,7 @@ public class HttpTransportServer extends WebTransportServerSupport { contextHandler.setAttribute("transportFactory", transportFactory); contextHandler.setAttribute("transportOptions", transportOptions); - GzipHandler gzipHandler = new GzipHandler(); - contextHandler.setHandler(gzipHandler); + addGzipHandler(contextHandler); server.start(); @@ -105,8 +104,9 @@ public class HttpTransportServer extends WebTransportServerSupport { // was set to zero so that we report the actual port we are listening on. int port = boundTo.getPort(); - if (connector.getLocalPort() != -1) { - port = connector.getLocalPort(); + int p2 = getConnectorLocalPort(); + if (p2 != -1) { + port = p2; } setConnectURI(new URI(boundTo.getScheme(), @@ -118,6 +118,19 @@ public class HttpTransportServer extends WebTransportServerSupport { boundTo.getFragment())); } + private int getConnectorLocalPort() throws Exception { + return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector); + } + private void addGzipHandler(ServletContextHandler contextHandler) throws Exception { + Handler handler = null; + try { + handler = (Handler)Class.forName("org.eclipse.jetty.server.handler.GzipHandler", true, Handler.class.getClassLoader()).newInstance(); + } catch (Throwable t) { + handler = (Handler)Class.forName("org.eclipse.jetty.servlets.gzip.GzipHandler", true, Handler.class.getClassLoader()).newInstance(); + } + contextHandler.setHandler(handler); + } + @Override protected void doStop(ServiceStopper stopper) throws Exception { Server temp = server; http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java index 858c9ad..cf36122 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/Krb5AndCertsSslSocketConnector.java @@ -68,6 +68,14 @@ public class Krb5AndCertsSslSocketConnector extends SslSocketConnector { useCerts = true; setPasswords(); } + public Krb5AndCertsSslSocketConnector(SslContextFactory f, String auth) { + // By default, stick to cert based authentication + super(f); + useKrb = false; + useCerts = true; + setPasswords(); + setMode(auth); + } public static boolean isKrb(String mode) { return mode == MODE.KRB.toString() || mode == MODE.BOTH.toString(); http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java deleted file mode 100644 index 047c459..0000000 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.ws; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.command.Command; -import org.apache.activemq.transport.TransportSupport; -import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; -import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; -import org.apache.activemq.transport.mqtt.MQTTTransport; -import org.apache.activemq.transport.mqtt.MQTTWireFormat; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.eclipse.jetty.websocket.WebSocket; -import org.fusesource.mqtt.codec.DISCONNECT; -import org.fusesource.mqtt.codec.MQTTFrame; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.security.cert.X509Certificate; -import java.util.concurrent.CountDownLatch; - -public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware { - - private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); - Connection outbound; - MQTTProtocolConverter protocolConverter = null; - MQTTWireFormat wireFormat = new MQTTWireFormat(); - private final CountDownLatch socketTransportStarted = new CountDownLatch(1); - private BrokerService brokerService; - - @Override - public void onMessage(byte[] bytes, int offset, int length) { - if (!transportStartedAtLeastOnce()) { - LOG.debug("Waiting for StompSocket to be properly started..."); - try { - socketTransportStarted.await(); - } catch (InterruptedException e) { - LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); - } - } - - try { - MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); - getProtocolConverter().onMQTTCommand(frame); - } catch (Exception e) { - onException(IOExceptionSupport.create(e)); - } - } - - private MQTTProtocolConverter getProtocolConverter() { - if( protocolConverter == null ) { - protocolConverter = new MQTTProtocolConverter(this, brokerService); - } - return protocolConverter; - } - - @Override - public void onOpen(Connection connection) { - this.outbound = connection; - } - - @Override - public void onClose(int closeCode, String message) { - try { - getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); - } catch (Exception e) { - LOG.warn("Failed to close WebSocket", e); - } - } - - protected void doStart() throws Exception { - socketTransportStarted.countDown(); - } - - @Override - protected void doStop(ServiceStopper stopper) throws Exception { - } - - private boolean transportStartedAtLeastOnce() { - return socketTransportStarted.getCount() == 0; - } - - @Override - public int getReceiveCounter() { - return 0; - } - - @Override - public String getRemoteAddress() { - return "MQTTSocket_" + this.hashCode(); - } - - @Override - public void oneway(Object command) throws IOException { - try { - getProtocolConverter().onActiveMQCommand((Command) command); - } catch (Exception e) { - onException(IOExceptionSupport.create(e)); - } - } - - @Override - public void sendToActiveMQ(Command command) { - doConsume(command); - } - - @Override - public void sendToMQTT(MQTTFrame command) throws IOException { - ByteSequence bytes = wireFormat.marshal(command); - outbound.sendMessage(bytes.getData(), 0, bytes.getLength()); - } - - @Override - public X509Certificate[] getPeerCertificates() { - return new X509Certificate[0]; - } - - @Override - public MQTTInactivityMonitor getInactivityMonitor() { - return null; - } - - @Override - public MQTTWireFormat getWireFormat() { - return wireFormat; - } - - @Override - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java deleted file mode 100644 index b0da09a..0000000 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompSocket.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.ws; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import org.apache.activemq.command.Command; -import org.apache.activemq.transport.TransportSupport; -import org.apache.activemq.transport.stomp.ProtocolConverter; -import org.apache.activemq.transport.stomp.Stomp; -import org.apache.activemq.transport.stomp.StompFrame; -import org.apache.activemq.transport.stomp.StompInactivityMonitor; -import org.apache.activemq.transport.stomp.StompTransport; -import org.apache.activemq.transport.stomp.StompWireFormat; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.eclipse.jetty.websocket.WebSocket; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements web socket and mediates between servlet and the broker - */ -class StompSocket extends TransportSupport implements WebSocket.OnTextMessage, StompTransport { - private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class); - - Connection outbound; - ProtocolConverter protocolConverter = new ProtocolConverter(this, null); - StompWireFormat wireFormat = new StompWireFormat(); - private final CountDownLatch socketTransportStarted = new CountDownLatch(1); - private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat); - - @Override - public void onOpen(Connection connection) { - this.outbound = connection; - } - - @Override - public void onClose(int closeCode, String message) { - try { - protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT)); - } catch (Exception e) { - LOG.warn("Failed to close WebSocket", e); - } - } - - @Override - public void onMessage(String data) { - - if (!transportStartedAtLeastOnce()) { - LOG.debug("Waiting for StompSocket to be properly started..."); - try { - socketTransportStarted.await(); - } catch (InterruptedException e) { - LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); - } - } - - - try { - protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")))); - } catch (Exception e) { - onException(IOExceptionSupport.create(e)); - } - } - - private boolean transportStartedAtLeastOnce() { - return socketTransportStarted.getCount() == 0; - } - - @Override - protected void doStart() throws Exception { - socketTransportStarted.countDown(); - } - - @Override - protected void doStop(ServiceStopper stopper) throws Exception { - } - - @Override - public int getReceiveCounter() { - return 0; - } - - @Override - public String getRemoteAddress() { - return "StompSocket_" + this.hashCode(); - } - - @Override - public void oneway(Object command) throws IOException { - try { - protocolConverter.onActiveMQCommand((Command)command); - } catch (Exception e) { - onException(IOExceptionSupport.create(e)); - } - } - - @Override - public void sendToActiveMQ(Command command) { - doConsume(command); - } - - @Override - public void sendToStomp(StompFrame command) throws IOException { - outbound.sendMessage(command.format()); - } - - @Override - public StompInactivityMonitor getInactivityMonitor() { - return stompInactivityMonitor; - } - - @Override - public StompWireFormat getWireFormat() { - return this.wireFormat; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java deleted file mode 100644 index d0ed22d..0000000 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSServlet.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.transport.ws; - -import java.io.IOException; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.TransportAcceptListener; -import org.eclipse.jetty.websocket.WebSocket; -import org.eclipse.jetty.websocket.WebSocketServlet; - -/** - * Handle connection upgrade requests and creates web sockets - */ -public class WSServlet extends WebSocketServlet { - private static final long serialVersionUID = -4716657876092884139L; - - private TransportAcceptListener listener; - - public void init() throws ServletException { - super.init(); - listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener"); - if (listener == null) { - throw new ServletException("No such attribute 'acceptListener' available in the ServletContext"); - } - } - - protected void doGet(HttpServletRequest request, HttpServletResponse response) - throws ServletException ,IOException { - getServletContext().getNamedDispatcher("default").forward(request,response); - } - - @Override - public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) { - WebSocket socket; - if (protocol != null && protocol.startsWith("mqtt")) { - socket = new MQTTSocket(); - } else { - socket = new StompSocket(); - } - listener.onAccept((Transport)socket); - return socket; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java index 4b75c9a..e26027d 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java @@ -49,7 +49,7 @@ public class WSTransportServer extends WebTransportServerSupport { @Override protected void doStart() throws Exception { - server = new Server(); + createServer(); if (connector == null) { connector = socketConnectorFactory.createConnector(server); @@ -69,7 +69,11 @@ public class WSTransportServer extends WebTransportServerSupport { } } - holder.setServlet(new WSServlet()); + if (Server.getVersion().startsWith("8")) { + holder.setServlet(new org.apache.activemq.transport.ws.jetty8.WSServlet()); + } else { + holder.setServlet(new org.apache.activemq.transport.ws.jetty9.WSServlet()); + } contextHandler.addServlet(holder, "/"); contextHandler.setAttribute("acceptListener", getAcceptListener()); @@ -79,9 +83,9 @@ public class WSTransportServer extends WebTransportServerSupport { // Update the Connect To URI with our actual location in case the configured port // was set to zero so that we report the actual port we are listening on. - int port = boundTo.getPort(); - if (connector.getLocalPort() != -1) { - port = connector.getLocalPort(); + int port = getConnectorLocalPort(); + if (port == -1) { + port = boundTo.getPort(); } setConnectURI(new URI(boundTo.getScheme(), @@ -95,6 +99,10 @@ public class WSTransportServer extends WebTransportServerSupport { LOG.info("Listening for connections at {}", getConnectURI()); } + private int getConnectorLocalPort() throws Exception { + return (Integer)connector.getClass().getMethod("getLocalPort").invoke(connector); + } + @Override protected void doStop(ServiceStopper stopper) throws Exception { Server temp = server; http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java new file mode 100644 index 0000000..58e9134 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/MQTTSocket.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws.jetty8; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; +import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; +import org.apache.activemq.transport.mqtt.MQTTTransport; +import org.apache.activemq.transport.mqtt.MQTTWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.eclipse.jetty.websocket.WebSocket; +import org.fusesource.mqtt.codec.DISCONNECT; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.cert.X509Certificate; +import java.util.concurrent.CountDownLatch; + +public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); + Connection outbound; + MQTTProtocolConverter protocolConverter = null; + MQTTWireFormat wireFormat = new MQTTWireFormat(); + private final CountDownLatch socketTransportStarted = new CountDownLatch(1); + private BrokerService brokerService; + + @Override + public void onMessage(byte[] bytes, int offset, int length) { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for StompSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + try { + MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); + getProtocolConverter().onMQTTCommand(frame); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + private MQTTProtocolConverter getProtocolConverter() { + if( protocolConverter == null ) { + protocolConverter = new MQTTProtocolConverter(this, brokerService); + } + return protocolConverter; + } + + @Override + public void onOpen(Connection connection) { + this.outbound = connection; + } + + @Override + public void onClose(int closeCode, String message) { + try { + getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); + } catch (Exception e) { + LOG.warn("Failed to close WebSocket", e); + } + } + + protected void doStart() throws Exception { + socketTransportStarted.countDown(); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + } + + private boolean transportStartedAtLeastOnce() { + return socketTransportStarted.getCount() == 0; + } + + @Override + public int getReceiveCounter() { + return 0; + } + + @Override + public String getRemoteAddress() { + return "MQTTSocket_" + this.hashCode(); + } + + @Override + public void oneway(Object command) throws IOException { + try { + getProtocolConverter().onActiveMQCommand((Command) command); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + @Override + public void sendToActiveMQ(Command command) { + doConsume(command); + } + + @Override + public void sendToMQTT(MQTTFrame command) throws IOException { + ByteSequence bytes = wireFormat.marshal(command); + outbound.sendMessage(bytes.getData(), 0, bytes.getLength()); + } + + @Override + public X509Certificate[] getPeerCertificates() { + return new X509Certificate[0]; + } + + @Override + public MQTTInactivityMonitor getInactivityMonitor() { + return null; + } + + @Override + public MQTTWireFormat getWireFormat() { + return wireFormat; + } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java new file mode 100644 index 0000000..dba3ca9 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws.jetty8; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.stomp.ProtocolConverter; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.StompInactivityMonitor; +import org.apache.activemq.transport.stomp.StompTransport; +import org.apache.activemq.transport.stomp.StompWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.eclipse.jetty.websocket.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements web socket and mediates between servlet and the broker + */ +class StompSocket extends TransportSupport implements WebSocket.OnTextMessage, StompTransport { + private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class); + + Connection outbound; + ProtocolConverter protocolConverter = new ProtocolConverter(this, null); + StompWireFormat wireFormat = new StompWireFormat(); + private final CountDownLatch socketTransportStarted = new CountDownLatch(1); + private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat); + + @Override + public void onOpen(Connection connection) { + this.outbound = connection; + } + + @Override + public void onClose(int closeCode, String message) { + try { + protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT)); + } catch (Exception e) { + LOG.warn("Failed to close WebSocket", e); + } + } + + @Override + public void onMessage(String data) { + + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for StompSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + + try { + protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")))); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + private boolean transportStartedAtLeastOnce() { + return socketTransportStarted.getCount() == 0; + } + + @Override + protected void doStart() throws Exception { + socketTransportStarted.countDown(); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + } + + @Override + public int getReceiveCounter() { + return 0; + } + + @Override + public String getRemoteAddress() { + return "StompSocket_" + this.hashCode(); + } + + @Override + public void oneway(Object command) throws IOException { + try { + protocolConverter.onActiveMQCommand((Command)command); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + @Override + public void sendToActiveMQ(Command command) { + doConsume(command); + } + + @Override + public void sendToStomp(StompFrame command) throws IOException { + outbound.sendMessage(command.format()); + } + + @Override + public StompInactivityMonitor getInactivityMonitor() { + return stompInactivityMonitor; + } + + @Override + public StompWireFormat getWireFormat() { + return this.wireFormat; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java new file mode 100644 index 0000000..d0f7b19 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/WSServlet.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.ws.jetty8; + +import java.io.IOException; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportAcceptListener; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketServlet; + +/** + * Handle connection upgrade requests and creates web sockets + */ +public class WSServlet extends WebSocketServlet { + private static final long serialVersionUID = -4716657876092884139L; + + private TransportAcceptListener listener; + + public void init() throws ServletException { + super.init(); + listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener"); + if (listener == null) { + throw new ServletException("No such attribute 'acceptListener' available in the ServletContext"); + } + } + + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException ,IOException { + getServletContext().getNamedDispatcher("default").forward(request,response); + } + + @Override + public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) { + WebSocket socket; + if (protocol != null && protocol.startsWith("mqtt")) { + socket = new MQTTSocket(); + } else { + socket = new StompSocket(); + } + listener.onAccept((Transport)socket); + return socket; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java new file mode 100644 index 0000000..4d7dac3 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws.jetty9; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; +import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; +import org.apache.activemq.transport.mqtt.MQTTTransport; +import org.apache.activemq.transport.mqtt.MQTTWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.fusesource.mqtt.codec.DISCONNECT; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.cert.X509Certificate; +import java.util.concurrent.CountDownLatch; + +public class MQTTSocket extends TransportSupport implements WebSocketListener, MQTTTransport, BrokerServiceAware { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); + Session session; + MQTTProtocolConverter protocolConverter = null; + MQTTWireFormat wireFormat = new MQTTWireFormat(); + private final CountDownLatch socketTransportStarted = new CountDownLatch(1); + private BrokerService brokerService; + + private MQTTProtocolConverter getProtocolConverter() { + if( protocolConverter == null ) { + protocolConverter = new MQTTProtocolConverter(this, brokerService); + } + return protocolConverter; + } + + protected void doStart() throws Exception { + socketTransportStarted.countDown(); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + } + + private boolean transportStartedAtLeastOnce() { + return socketTransportStarted.getCount() == 0; + } + + @Override + public int getReceiveCounter() { + return 0; + } + + @Override + public String getRemoteAddress() { + return "MQTTSocket_" + this.hashCode(); + } + + @Override + public void oneway(Object command) throws IOException { + try { + getProtocolConverter().onActiveMQCommand((Command) command); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + @Override + public void sendToActiveMQ(Command command) { + doConsume(command); + } + + @Override + public void sendToMQTT(MQTTFrame command) throws IOException { + ByteSequence bytes = wireFormat.marshal(command); + session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength())); + } + + @Override + public X509Certificate[] getPeerCertificates() { + return new X509Certificate[0]; + } + + @Override + public MQTTInactivityMonitor getInactivityMonitor() { + return null; + } + + @Override + public MQTTWireFormat getWireFormat() { + return wireFormat; + } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + @Override + public void onWebSocketBinary(byte[] bytes, int offset, int length) { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for StompSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + try { + MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); + getProtocolConverter().onMQTTCommand(frame); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + @Override + public void onWebSocketClose(int arg0, String arg1) { + try { + getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); + } catch (Exception e) { + LOG.warn("Failed to close WebSocket", e); + } + } + + @Override + public void onWebSocketConnect(Session session) { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable arg0) { + + } + + @Override + public void onWebSocketText(String arg0) { + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java new file mode 100644 index 0000000..811f228 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws.jetty9; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.activemq.command.Command; +import org.apache.activemq.transport.TransportSupport; +import org.apache.activemq.transport.stomp.ProtocolConverter; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.stomp.StompInactivityMonitor; +import org.apache.activemq.transport.stomp.StompTransport; +import org.apache.activemq.transport.stomp.StompWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceStopper; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements web socket and mediates between servlet and the broker + */ +class StompSocket extends TransportSupport implements WebSocketListener, StompTransport { + private static final Logger LOG = LoggerFactory.getLogger(StompSocket.class); + + Session session; + ProtocolConverter protocolConverter = new ProtocolConverter(this, null); + StompWireFormat wireFormat = new StompWireFormat(); + private final CountDownLatch socketTransportStarted = new CountDownLatch(1); + private StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat); + + private boolean transportStartedAtLeastOnce() { + return socketTransportStarted.getCount() == 0; + } + + @Override + protected void doStart() throws Exception { + socketTransportStarted.countDown(); + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + } + + @Override + public int getReceiveCounter() { + return 0; + } + + @Override + public String getRemoteAddress() { + return "StompSocket_" + this.hashCode(); + } + + @Override + public void oneway(Object command) throws IOException { + try { + protocolConverter.onActiveMQCommand((Command)command); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + + @Override + public void sendToActiveMQ(Command command) { + doConsume(command); + } + + @Override + public void sendToStomp(StompFrame command) throws IOException { + session.getRemote().sendString(command.format()); + } + + @Override + public StompInactivityMonitor getInactivityMonitor() { + return stompInactivityMonitor; + } + + @Override + public StompWireFormat getWireFormat() { + return this.wireFormat; + } + + @Override + public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) { + } + + @Override + public void onWebSocketClose(int arg0, String arg1) { + try { + protocolConverter.onStompCommand(new StompFrame(Stomp.Commands.DISCONNECT)); + } catch (Exception e) { + LOG.warn("Failed to close WebSocket", e); + } + } + + @Override + public void onWebSocketConnect(Session session) { + this.session = session; + } + + @Override + public void onWebSocketError(Throwable arg0) { + } + + @Override + public void onWebSocketText(String data) { + if (!transportStartedAtLeastOnce()) { + LOG.debug("Waiting for StompSocket to be properly started..."); + try { + socketTransportStarted.await(); + } catch (InterruptedException e) { + LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); + } + } + + try { + protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")))); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java new file mode 100644 index 0000000..15927b1 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.transport.ws.jetty9; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.activemq.transport.TransportAcceptListener; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +/** + * Handle connection upgrade requests and creates web sockets + */ +public class WSServlet extends WebSocketServlet { + private static final long serialVersionUID = -4716657876092884139L; + + private TransportAcceptListener listener; + + public void init() throws ServletException { + super.init(); + listener = (TransportAcceptListener)getServletContext().getAttribute("acceptListener"); + if (listener == null) { + throw new ServletException("No such attribute 'acceptListener' available in the ServletContext"); + } + } + + protected void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException ,IOException { + getServletContext().getNamedDispatcher("default").forward(request,response); + } + + + public void configure(WebSocketServletFactory factory) { + factory.setCreator(new WebSocketCreator() { + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) { + WebSocketListener socket; + if (req.getSubProtocols().contains("mqtt")) { + socket = new MQTTSocket(); + } else { + socket = new StompSocket(); + } + return socket; + } + }); + + } +} + http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java index 92bc1cb..140356e 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java @@ -32,16 +32,18 @@ import javax.net.ServerSocketFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.spring.SpringSslContext; +import org.apache.activemq.transport.SocketConnectorFactory; import org.apache.activemq.transport.stomp.StompConnection; import org.apache.activemq.util.Wait; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.webapp.WebAppContext; + import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; + import org.openqa.selenium.By; import org.openqa.selenium.WebDriver; import org.openqa.selenium.WebElement; @@ -97,7 +99,9 @@ public class WSTransportTest { Server server = new Server(); Connector connector = createJettyConnector(server); - connector.setServer(server); + if (Server.getVersion().startsWith("8")) { + connector.setServer(server); + } WebAppContext context = new WebAppContext(); context.setResourceBase("src/test/webapp"); @@ -129,10 +133,10 @@ public class WSTransportTest { return proxyPort; } - protected Connector createJettyConnector(Server server) { - SelectChannelConnector connector = new SelectChannelConnector(); - connector.setPort(getProxyPort()); - return connector; + protected Connector createJettyConnector(Server server) throws Exception { + Connector c = new SocketConnectorFactory().createConnector(server); + c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort()); + return c; } protected void stopBroker() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java index ef61140..36b33f6 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportTest.java @@ -16,23 +16,23 @@ */ package org.apache.activemq.transport.wss; +import org.apache.activemq.transport.SecureSocketConnectorFactory; import org.apache.activemq.transport.ws.WSTransportTest; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ssl.SslSocketConnector; -import org.eclipse.jetty.util.ssl.SslContextFactory; public class WSSTransportTest extends WSTransportTest { @Override - protected Connector createJettyConnector(Server server) { - SslSocketConnector sslConnector = new SslSocketConnector(); - SslContextFactory contextFactory = sslConnector.getSslContextFactory(); - contextFactory.setKeyStorePath("src/test/resources/server.keystore"); - contextFactory.setKeyStorePassword("password"); - contextFactory.setTrustStore("src/test/resources/client.keystore"); - contextFactory.setTrustStorePassword("password"); - sslConnector.setPort(getProxyPort()); - return sslConnector; + protected Connector createJettyConnector(Server server) throws Exception { + SecureSocketConnectorFactory sscf = new SecureSocketConnectorFactory(); + sscf.setKeyStore("src/test/resources/server.keystore"); + sscf.setKeyStorePassword("password"); + sscf.setTrustStore("src/test/resources/client.keystore"); + sscf.setTrustStorePassword("password"); + + Connector c = sscf.createConnector(server); + c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort()); + return c; } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-osgi/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml index c56877a..74be454 100644 --- a/activemq-osgi/pom.xml +++ b/activemq-osgi/pom.xml @@ -52,7 +52,7 @@ com.fasterxml.jackson*;resolution:=optional, org.codehaus.jettison*;resolution:=optional, org.jasypt*;resolution:=optional, - org.eclipse.jetty*;resolution:=optional, + org.eclipse.jetty*;resolution:=optional;version="[8.1,10)", org.apache.zookeeper*;resolution:=optional, org.fusesource.leveldbjni*;resolution:=optional, org.fusesource.hawtjni*;resolution:=optional, http://git-wip-us.apache.org/repos/asf/activemq/blob/3f826250/activemq-web-console/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-web-console/pom.xml b/activemq-web-console/pom.xml index c14c40f..8002444 100755 --- a/activemq-web-console/pom.xml +++ b/activemq-web-console/pom.xml @@ -32,6 +32,7 @@ <properties> <jetty.port>8080</jetty.port> + <jetty.maven.groupid>org.mortbay.jetty</jetty.maven.groupid> </properties> <build> @@ -51,7 +52,7 @@ </configuration> </plugin> <plugin> - <groupId>org.mortbay.jetty</groupId> + <groupId>${jetty.maven.groupid}</groupId> <artifactId>jetty-maven-plugin</artifactId> <version>${jetty-version}</version> <configuration>
