Repository: activemq Updated Branches: refs/heads/master 83827f277 -> 31c55f751
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java index 4110fcb..3029668 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java @@ -23,6 +23,8 @@ import java.util.Map; import javax.servlet.Servlet; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.transport.SocketConnectorFactory; import org.apache.activemq.transport.WebTransportServerSupport; @@ -41,10 +43,13 @@ import org.slf4j.LoggerFactory; * Creates a web server and registers web socket server * */ -public class WSTransportServer extends WebTransportServerSupport { +public class WSTransportServer extends WebTransportServerSupport implements BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(WSTransportServer.class); + private BrokerService brokerService; + private WSServlet servlet; + public WSTransportServer(URI location) { super(location); this.bindAddress = location; @@ -105,8 +110,10 @@ public class WSTransportServer extends WebTransportServerSupport { } private Servlet createWSServlet() throws Exception { - WSServlet servlet = new WSServlet(); + servlet = new WSServlet(); servlet.setTransportOptions(transportOptions); + servlet.setBrokerService(brokerService); + return servlet; } @@ -147,4 +154,12 @@ public class WSTransportServer extends WebTransportServerSupport { public boolean isSslServer() { return false; } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + if (servlet != null) { + servlet.setBrokerService(brokerService); + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java index 338be98..1f7c5e7 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java @@ -18,17 +18,26 @@ package org.apache.activemq.transport.ws.jetty9; import java.io.IOException; -import java.util.*; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.activemq.jms.pool.IntrospectionSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportAcceptListener; +import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.util.HttpTransportUtils; +import org.apache.activemq.transport.ws.WSTransportProxy; import org.eclipse.jetty.websocket.api.WebSocketListener; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; @@ -39,16 +48,21 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; /** * Handle connection upgrade requests and creates web sockets */ -public class WSServlet extends WebSocketServlet { +public class WSServlet extends WebSocketServlet implements BrokerServiceAware { private static final long serialVersionUID = -4716657876092884139L; private TransportAcceptListener listener; - private final static Map<String, Integer> stompProtocols = new ConcurrentHashMap<> (); - private final static Map<String, Integer> mqttProtocols = new ConcurrentHashMap<> (); + private final static Map<String, Integer> stompProtocols = new ConcurrentHashMap<>(); + private final static Map<String, Integer> mqttProtocols = new ConcurrentHashMap<>(); private Map<String, Object> transportOptions; + private BrokerService brokerService; + + private enum Protocol { + MQTT, STOMP, UNKNOWN + } static { stompProtocols.put("v12.stomp", 3); @@ -80,41 +94,98 @@ public class WSServlet extends WebSocketServlet { @Override public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) { WebSocketListener socket; - boolean isMqtt = false; - for (String subProtocol : req.getSubProtocols()) { - if (subProtocol.startsWith("mqtt")) { - isMqtt = true; + Protocol requestedProtocol = Protocol.UNKNOWN; + + // When no sub-protocol is requested we default to STOMP for legacy reasons. + if (!req.getSubProtocols().isEmpty()) { + for (String subProtocol : req.getSubProtocols()) { + if (subProtocol.startsWith("mqtt")) { + requestedProtocol = Protocol.MQTT; + } else if (subProtocol.contains("stomp")) { + requestedProtocol = Protocol.STOMP; + } } - } - if (isMqtt) { - socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); - resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,req.getSubProtocols(), "mqtt")); - ((MQTTSocket)socket).setTransportOptions(new HashMap(transportOptions)); - ((MQTTSocket)socket).setPeerCertificates(req.getCertificates()); } else { - socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); - ((StompSocket)socket).setCertificates(req.getCertificates()); - resp.setAcceptedSubProtocol(getAcceptedSubProtocol(stompProtocols,req.getSubProtocols(), "stomp")); + requestedProtocol = Protocol.STOMP; + } + + switch (requestedProtocol) { + case MQTT: + socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); + ((MQTTSocket) socket).setTransportOptions(new HashMap<String, Object>(transportOptions)); + ((MQTTSocket) socket).setPeerCertificates(req.getCertificates()); + resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols, req.getSubProtocols(), "mqtt")); + break; + case UNKNOWN: + socket = findWSTransport(req, resp); + if (socket != null) { + break; + } + case STOMP: + socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); + ((StompSocket) socket).setPeerCertificates(req.getCertificates()); + resp.setAcceptedSubProtocol(getAcceptedSubProtocol(stompProtocols, req.getSubProtocols(), "stomp")); + break; + default: + socket = null; + listener.onAcceptError(new IOException("Unknown protocol requested")); + break; + } + + if (socket != null) { + listener.onAccept((Transport) socket); } - listener.onAccept((Transport) socket); + return socket; } }); } - private String getAcceptedSubProtocol(final Map<String, Integer> protocols, - List<String> subProtocols, String defaultProtocol) { + private WebSocketListener findWSTransport(ServletUpgradeRequest request, ServletUpgradeResponse response) { + WSTransportProxy proxy = null; + + for (String subProtocol : request.getSubProtocols()) { + try { + String remoteAddress = HttpTransportUtils.generateWsRemoteAddress(request.getHttpServletRequest(), subProtocol); + URI remoteURI = new URI(remoteAddress); + + TransportFactory factory = TransportFactory.findTransportFactory(remoteURI); + + if (factory instanceof BrokerServiceAware) { + ((BrokerServiceAware) factory).setBrokerService(brokerService); + } + + Transport transport = factory.doConnect(remoteURI); + + proxy = new WSTransportProxy(remoteAddress, transport); + proxy.setPeerCertificates(request.getCertificates()); + proxy.setTransportOptions(transportOptions); + + response.setAcceptedSubProtocol(proxy.getSubProtocol()); + } catch (Exception e) { + proxy = null; + + // Keep going and try any other sub-protocols present. + continue; + } + } + + return proxy; + } + + private String getAcceptedSubProtocol(final Map<String, Integer> protocols, List<String> subProtocols, String defaultProtocol) { List<SubProtocol> matchedProtocols = new ArrayList<>(); if (subProtocols != null && subProtocols.size() > 0) { - //detect which subprotocols match accepted protocols and add to the list + // detect which subprotocols match accepted protocols and add to the + // list for (String subProtocol : subProtocols) { Integer priority = protocols.get(subProtocol); - if(subProtocol != null && priority != null) { - //only insert if both subProtocol and priority are not null + if (subProtocol != null && priority != null) { + // only insert if both subProtocol and priority are not null matchedProtocols.add(new SubProtocol(subProtocol, priority)); } } - //sort the list by priority + // sort the list by priority if (matchedProtocols.size() > 0) { Collections.sort(matchedProtocols, new Comparator<SubProtocol>() { @Override @@ -131,6 +202,7 @@ public class WSServlet extends WebSocketServlet { private class SubProtocol { private String protocol; private Integer priority; + public SubProtocol(String protocol, Integer priority) { this.protocol = protocol; this.priority = priority; @@ -140,4 +212,9 @@ public class WSServlet extends WebSocketServlet { public void setTransportOptions(Map<String, Object> transportOptions) { this.transportOptions = transportOptions; } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java index 34e8502..05a8159 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportFactory.java @@ -22,6 +22,8 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.SslContext; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportServer; @@ -32,7 +34,9 @@ import org.apache.activemq.util.URISupport; /** * Factory for Secure WebSocket (wss) transport */ -public class WSSTransportFactory extends TransportFactory { +public class WSSTransportFactory extends TransportFactory implements BrokerServiceAware { + + private BrokerService brokerService; @Override public TransportServer doBind(URI location) throws IOException { @@ -44,9 +48,15 @@ public class WSSTransportFactory extends TransportFactory { IntrospectionSupport.setProperties(result, transportOptions); result.setTransportOption(transportOptions); result.setHttpOptions(httpOptions); + result.setBrokerService(brokerService); return result; } catch (URISyntaxException e) { throw IOExceptionSupport.create(e); } } + + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java index fdbf867..c4e8c47 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java @@ -250,9 +250,6 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe } } - /* (non-Javadoc) - * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String) - */ @Override public void onWebSocketClose(int statusCode, String reason) { LOG.trace("MQTT WS Connection closed, code:{} message:{}", statusCode, reason); @@ -263,14 +260,9 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe } - /* (non-Javadoc) - * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session) - */ @Override - public void onWebSocketConnect( - org.eclipse.jetty.websocket.api.Session session) { + public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) { this.connection = session; this.connectLatch.countDown(); } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java index 83cbd69..844c661 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java @@ -43,13 +43,9 @@ public class StompWSConnectionTimeoutTest extends WSTransportTestSupport { super.setUp(); wsStompConnection = new StompWSConnection(); -// WebSocketClientFactory clientFactory = new WebSocketClientFactory(); -// clientFactory.start(); wsClient = new WebSocketClient(); wsClient.start(); - - wsClient.connect(wsStompConnection, wsConnectUri); if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) { throw new IOException("Could not connect to STOMP WS endpoint"); http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java index edf7b6c..c83f24d 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java @@ -69,18 +69,16 @@ public class WSTransportTestSupport { LOG.info("========== Finished test: {} ==========", name.getMethodName()); } -// protected String getWSConnectorURI() { -// return "ws://127.0.0.1:" + getProxyPort() + -// "?allowLinkStealing=" + isAllowLinkStealing() + -// "&websocket.maxTextMessageSize=99999&" + -// "transport.maxIdleTime=1001"; -// } + protected String getWSConnectionURI() { + return "ws://127.0.0.1:" + getProxyPort(); + } protected String getWSConnectorURI() { return "ws://127.0.0.1:" + getProxyPort() + - "?allowLinkStealing=" + isAllowLinkStealing() + - "&websocket.maxTextMessageSize=99999&" + - "transport.idleTimeout=1001"; + "?allowLinkStealing=" + isAllowLinkStealing() + + "&websocket.maxTextMessageSize=99999" + + "&transport.idleTimeout=1001" + + "&trace=true&transport.trace=true"; } protected boolean isAllowLinkStealing() { http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/resources/log4j.properties b/activemq-http/src/test/resources/log4j.properties index aa64270..2aabf97 100755 --- a/activemq-http/src/test/resources/log4j.properties +++ b/activemq-http/src/test/resources/log4j.properties @@ -20,8 +20,10 @@ # log4j.rootLogger=INFO, out, stdout -log4j.logger.org.apache.activemq.transport.ws=DEBUG +log4j.logger.org.apache.activemq.transport.ws=TRACE log4j.logger.org.apache.activemq.transport.http=DEBUG +log4j.logger.org.apache.activemq.transport.amqp=TRACE +log4j.logger.org.apache.activemq.transport.amqp.FRAMES=TRACE #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index ef3f003..a5f114e 100755 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -64,10 +64,6 @@ </dependency> <dependency> <groupId>org.apache.activemq</groupId> - <artifactId>activemq-amqp</artifactId> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> <artifactId>activemq-partition</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java deleted file mode 100644 index c4a2fd7..0000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.conversions; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.UnsupportedEncodingException; -import java.util.Arrays; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.qpid.jms.JmsConnectionFactory; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.QoS; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AmqpAndMqttTest { - - protected BrokerService broker; - private TransportConnector amqpConnector; - private TransportConnector mqttConnector; - - @Before - public void setUp() throws Exception { - broker = createBroker(); - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - broker = null; - } - } - - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.setAdvisorySupport(false); - broker.setSchedulerSupport(false); - - amqpConnector = broker.addConnector("amqp://0.0.0.0:0"); - mqttConnector = broker.addConnector("mqtt://0.0.0.0:0"); - - return broker; - } - - @Test(timeout = 60000) - public void testFromMqttToAmqp() throws Exception { - Connection amqp = createAmqpConnection(); - Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO")); - - final BlockingConnection mqtt = createMQTTConnection().blockingConnection(); - mqtt.connect(); - byte[] payload = bytes("Hello World"); - mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false); - mqtt.disconnect(); - - Message msg = consumer.receive(1000 * 5); - assertNotNull(msg); - assertTrue(msg instanceof BytesMessage); - - BytesMessage bmsg = (BytesMessage) msg; - byte[] actual = new byte[(int) bmsg.getBodyLength()]; - bmsg.readBytes(actual); - assertTrue(Arrays.equals(actual, payload)); - amqp.close(); - } - - private byte[] bytes(String value) { - try { - return value.getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - - protected MQTT createMQTTConnection() throws Exception { - MQTT mqtt = new MQTT(); - mqtt.setConnectAttemptsMax(1); - mqtt.setReconnectAttemptsMax(0); - mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort()); - return mqtt; - } - - public Connection createAmqpConnection() throws Exception { - - String amqpURI = "amqp://localhost:" + amqpConnector.getConnectUri().getPort(); - - final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI); - - factory.setUsername("admin"); - factory.setPassword("password"); - - final Connection connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); - return connection; - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java index 8fb70ec..24a1dcd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/StubTransport.java @@ -17,26 +17,27 @@ package org.apache.activemq.transport; import java.io.IOException; +import java.security.cert.X509Certificate; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.wireformat.WireFormat; -/** - * - * - */ public class StubTransport extends TransportSupport { private Queue<Object> queue = new ConcurrentLinkedQueue<Object>(); private volatile int receiveCounter; + @Override protected void doStop(ServiceStopper stopper) throws Exception { } + @Override protected void doStart() throws Exception { } + @Override public void oneway(Object command) throws IOException { receiveCounter++; queue.add(command); @@ -46,12 +47,28 @@ public class StubTransport extends TransportSupport { return queue; } + @Override public String getRemoteAddress() { return null; } + @Override public int getReceiveCounter() { return receiveCounter; } + @Override + public X509Certificate[] getPeerCertificates() { + return null; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + + } + + @Override + public WireFormat getWireFormat() { + return null; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java index 9054e1a..82d87ba 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/auto/AutoTransportConfigureTest.java @@ -35,7 +35,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; - @RunWith(Parameterized.class) public class AutoTransportConfigureTest { @@ -49,12 +48,7 @@ public class AutoTransportConfigureTest { @Parameters public static Iterable<Object[]> parameters() { - return Arrays.asList(new Object[][]{ - {"auto"}, - {"auto+nio"}, - {"auto+ssl"}, - {"auto+nio+ssl"} - }); + return Arrays.asList(new Object[][] { { "auto" }, { "auto+nio" }, { "auto+ssl" }, { "auto+nio+ssl" } }); } private String transportType; @@ -76,7 +70,7 @@ public class AutoTransportConfigureTest { } @After - public void tearDown() throws Exception{ + public void tearDown() throws Exception { if (this.brokerService != null) { this.brokerService.stop(); this.brokerService.waitUntilStopped(); @@ -92,7 +86,7 @@ public class AutoTransportConfigureTest { } - @Test(expected=JMSException.class) + @Test(expected = JMSException.class) public void testUrlConfiguration() throws Exception { createBroker(transportType + "://localhost:0?wireFormat.maxFrameSize=10"); @@ -100,7 +94,7 @@ public class AutoTransportConfigureTest { sendMessage(factory.createConnection()); } - @Test(expected=JMSException.class) + @Test(expected = JMSException.class) public void testUrlConfigurationOpenWireFail() throws Exception { createBroker(transportType + "://localhost:0?wireFormat.default.maxFrameSize=10"); @@ -110,17 +104,17 @@ public class AutoTransportConfigureTest { @Test public void testUrlConfigurationOpenWireSuccess() throws Exception { - //Will work because max frame size only applies to amqp - createBroker(transportType + "://localhost:0?wireFormat.amqp.maxFrameSize=10"); + // Will work because max frame size only applies to stomp + createBroker(transportType + "://localhost:0?wireFormat.stomp.maxFrameSize=10"); ConnectionFactory factory = new ActiveMQConnectionFactory(url); sendMessage(factory.createConnection()); } - @Test(expected=JMSException.class) + @Test(expected = JMSException.class) public void testUrlConfigurationOpenWireNotAvailable() throws Exception { - //only amqp is available so should fail - createBroker(transportType + "://localhost:0?auto.protocols=amqp"); + // only stomp is available so should fail + createBroker(transportType + "://localhost:0?auto.protocols=stomp"); ConnectionFactory factory = new ActiveMQConnectionFactory(url); sendMessage(factory.createConnection()); @@ -128,7 +122,7 @@ public class AutoTransportConfigureTest { @Test public void testUrlConfigurationOpenWireAvailable() throws Exception { - //only open wire is available + // only open wire is available createBroker(transportType + "://localhost:0?auto.protocols=default"); ConnectionFactory factory = new ActiveMQConnectionFactory(url); @@ -137,13 +131,12 @@ public class AutoTransportConfigureTest { @Test public void testUrlConfigurationOpenWireAndAmqpAvailable() throws Exception { - createBroker(transportType + "://localhost:0?auto.protocols=default,amqp"); + createBroker(transportType + "://localhost:0?auto.protocols=default,stomp"); ConnectionFactory factory = new ActiveMQConnectionFactory(url); sendMessage(factory.createConnection()); } - protected void sendMessage(Connection connection) throws JMSException { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -152,5 +145,4 @@ public class AutoTransportConfigureTest { message.setText("this is a test"); producer.send(message); } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index cf24896..f3c47ae 100755 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ <zookeeper-version>3.4.6</zookeeper-version> <qpid-proton-version>0.13.0</qpid-proton-version> <qpid-jms-version>0.9.0</qpid-jms-version> - <netty-all-version>4.0.33.Final</netty-all-version> + <netty-all-version>4.0.37.Final</netty-all-version> <regexp-version>1.3</regexp-version> <rome-version>1.0</rome-version> <saxon-version>9.5.1-5</saxon-version>
