Repository: activemq Updated Branches: refs/heads/master 07b0d913a -> 2e2d5ddd3
https://issues.apache.org/jira/browse/AMQ-6669 Respect the wireFormat.maxFrameSize option on WS and WSS transports allowing binary content larger than 65535 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2e2d5ddd Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2e2d5ddd Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2e2d5ddd Branch: refs/heads/master Commit: 2e2d5ddd3de7d0fe36ce5eb3d4fe81e97fe990a4 Parents: 07b0d91 Author: Timothy Bish <[email protected]> Authored: Thu May 4 16:37:53 2017 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu May 4 16:37:53 2017 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpWSTransport.java | 5 +++ .../activemq/transport/amqp/AmqpWireFormat.java | 3 +- .../transport/amqp/AmqpWireFormatFactory.java | 2 +- .../transport/amqp/JMSClientContext.java | 34 +++++++++++++++- .../transport/amqp/JMSClientTestSupport.java | 34 +++++++++++++++- .../amqp/JMSLargeMessageSendRecvTest.java | 26 +++++++++++- .../transport/amqp/client/AmqpSession.java | 6 +++ .../AmqpBrokerReuqestedHearbeatsTest.java | 4 +- .../AmqpClientRequestsHeartbeatsTest.java | 4 +- .../amqp/interop/AmqpConnectionsTest.java | 43 ++++++++++++++++++++ .../amqp/interop/AmqpMaxFrameSizeTest.java | 9 +++- .../activemq/transport/ws/WSTransport.java | 5 +++ .../activemq/transport/ws/WSTransportProxy.java | 5 +++ .../transport/ws/WSTransportServer.java | 4 +- .../activemq/transport/ws/jetty9/WSServlet.java | 2 +- 15 files changed, 170 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java index 2ec3a09..26b3561 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java @@ -100,6 +100,11 @@ public class AmqpWSTransport extends TransportSupport implements WSTransport, AM } @Override + public int getMaxFrameSize() { + return (int) Math.min(((AmqpWireFormat) getWireFormat()).getMaxFrameSize(), Integer.MAX_VALUE); + } + + @Override protected void doStop(ServiceStopper stopper) throws Exception { // Currently nothing needed here since we have no async workers. } http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 89facbe..7ddfc1f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -45,12 +45,13 @@ public class AmqpWireFormat implements WireFormat { public static final int DEFAULT_IDLE_TIMEOUT = 30000; public static final int DEFAULT_PRODUCER_CREDIT = 1000; public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = false; + public static final int DEFAULT_ANQP_FRAME_SIZE = NO_AMQP_MAX_FRAME_SIZE; private static final int SASL_PROTOCOL = 3; private int version = 1; private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; - private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE; + private int maxAmqpFrameSize = DEFAULT_ANQP_FRAME_SIZE; private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT; private int idelTimeout = DEFAULT_IDLE_TIMEOUT; private int producerCredit = DEFAULT_PRODUCER_CREDIT; http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java index bb428b4..196046c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormatFactory.java @@ -26,7 +26,7 @@ import org.apache.activemq.wireformat.WireFormatFactory; public class AmqpWireFormatFactory implements WireFormatFactory { private long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE; - private int maxAmqpFrameSize = AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE; + private int maxAmqpFrameSize = AmqpWireFormat.DEFAULT_ANQP_FRAME_SIZE; private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT; private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT; private String transformer = InboundTransformer.TRANSFORMER_NATIVE; http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java index 574e9f0..f249e7c 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientContext.java @@ -168,9 +168,39 @@ public class JMSClientContext { private ConnectionFactory createConnectionFactory( URI remoteURI, String username, String password, boolean syncPublish) { - boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl"); + String clientScheme; + boolean useSSL = false; + + switch (remoteURI.getScheme()) { + case "tcp" : + case "amqp": + case "auto": + case "amqp+nio": + case "auto+nio": + clientScheme = "amqp://"; + break; + case "ssl": + case "amqp+ssl": + case "auto+ssl": + case "amqp+nio+ssl": + case "auto+nio+ssl": + clientScheme = "amqps://"; + useSSL = true; + break; + case "ws": + case "amqp+ws": + clientScheme = "amqpws://"; + break; + case "wss": + case "amqp+wss": + clientScheme = "amqpwss://"; + useSSL = true; + break; + default: + clientScheme = "amqp://"; + } - String amqpURI = (useSSL ? "amqps://" : "amqp://") + remoteURI.getHost() + ":" + remoteURI.getPort(); + String amqpURI = clientScheme + remoteURI.getHost() + ":" + remoteURI.getPort(); if (useSSL) { amqpURI += "?transport.verifyHost=false"; http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java index d855c6b..8408652 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java @@ -92,9 +92,39 @@ public class JMSClientTestSupport extends AmqpTestSupport { protected URI getAmqpURI(String uriOptions) { - boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl"); + String clientScheme; + boolean useSSL = false; + + switch (getBrokerURI().getScheme()) { + case "tcp" : + case "amqp": + case "auto": + case "amqp+nio": + case "auto+nio": + clientScheme = "amqp://"; + break; + case "ssl": + case "amqp+ssl": + case "auto+ssl": + case "amqp+nio+ssl": + case "auto+nio+ssl": + clientScheme = "amqps://"; + useSSL = true; + break; + case "ws": + case "amqp+ws": + clientScheme = "amqpws://"; + break; + case "wss": + case "amqp+wss": + clientScheme = "amqpwss://"; + useSSL = true; + break; + default: + clientScheme = "amqp://"; + } - String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort(); + String amqpURI = clientScheme + getBrokerURI().getHost() + ":" + getBrokerURI().getPort(); if (uriOptions != null && !uriOptions.isEmpty()) { if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) { http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java index ef6eaba..20ad2d9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java @@ -20,6 +20,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import java.util.Collection; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -29,13 +32,32 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JMSLargeMessageSendRecvTest extends AmqpTestSupport { +@RunWith(Parameterized.class) +public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport { + + @Parameters(name="{0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {"amqp", false}, + {"amqp+ws", false}, + {"amqp+ssl", true}, + {"amqp+wss", true} + }); + } + + public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) { + super(connectorScheme, secure); + } @Rule public TestName testName = new TestName(); @@ -77,7 +99,7 @@ public class JMSLargeMessageSendRecvTest extends AmqpTestSupport { String payload = createLargeString(expectedSize); assertEquals(expectedSize, payload.getBytes().length); - Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI); + Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI()); long startTime = System.currentTimeMillis(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(testName.getMethodName()); http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index b8d38e2..8956692 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -681,6 +681,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> { //----- Private implementation details -----------------------------------// @Override + protected void doOpen() { + getEndpoint().setIncomingCapacity(Integer.MAX_VALUE); + super.doOpen(); + } + + @Override protected void doOpenInspection() { try { getStateInspector().inspectOpenedResource(getSession()); http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java index dc13369..e794274 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java @@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport { - private final int TEST_IDLE_TIMEOUT = 3000; + private final int TEST_IDLE_TIMEOUT = 1000; @Parameters(name="connector={0}") public static Collection<Object[]> data() { @@ -165,7 +165,7 @@ public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport { connection.connect(); assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); - assertFalse(disconnected.await(10, TimeUnit.SECONDS)); + assertFalse(disconnected.await(5, TimeUnit.SECONDS)); connection.close(); http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java index de47fd2..97b38fb 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java @@ -44,7 +44,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport { - private final int TEST_IDLE_TIMEOUT = 3000; + private final int TEST_IDLE_TIMEOUT = 1000; @Parameters(name="connector={0}") public static Collection<Object[]> data() { @@ -106,7 +106,7 @@ public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport { connection.connect(); assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); - assertFalse(disconnected.await(10, TimeUnit.SECONDS)); + assertFalse(disconnected.await(5, TimeUnit.SECONDS)); assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); connection.close(); http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java index 11cade7..a3474a9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java @@ -28,11 +28,16 @@ import static org.junit.Assert.fail; import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.amqp.AmqpSupport; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.AmqpError; @@ -258,4 +263,42 @@ public class AmqpConnectionsTest extends AmqpClientTestSupport { connection1.close(); assertEquals(0, getProxyToBroker().getCurrentConnectionsCount()); } + + @Test(timeout = 60000) + public void testSimpleSendOneReceive() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + AmqpMessage message = new AmqpMessage(); + + final int PAYLOAD_SIZE = 1024 * 1024; + + byte[] payload = new byte[PAYLOAD_SIZE]; + for (int i = 0; i < PAYLOAD_SIZE; i++) { + payload[i] = (byte) (i % PAYLOAD_SIZE); + } + + message.setMessageId("msg" + 1); + message.setMessageAnnotation("serialNo", 1); + message.setBytes(payload); + + sender.send(message); + sender.close(); + + LOG.info("Attempting to read message with receiver"); + receiver.flow(2); + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull("Should have read message", received); + assertEquals("msg1", received.getMessageId()); + received.accept(); + + receiver.close(); + + connection.close(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java index c818abe..84415d7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpMaxFrameSizeTest.java @@ -43,6 +43,8 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { + private final int TEST_IDLE_TIMEOUT = 500; + private final String testName; private final int maxFrameSize; private final int maxAmqpFrameSize; @@ -54,6 +56,8 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { { "amqp-> MFS < MAFS", "amqp", false, 2048, 1024 }, { "amqp+nio-> MFS > MAFS", "amqp+nio", false, 1024, 2048 }, { "amqp+nio-> MFS < MAFS", "amqp+nio", false, 2048, 1024 }, + { "amqp+ws-> MFS > MAFS", "amqp+ws", false, 1024, 2048 }, + { "amqp+ws-> MFS < MAFS", "amqp+ws", false, 2048, 1024 }, }); } @@ -89,12 +93,13 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { } }); + connection.setIdleTimeout(TEST_IDLE_TIMEOUT); connection.connect(); AmqpSession session = connection.createSession(); AmqpSender sender = session.createSender("queue://" + getTestName(), true); - byte[] payload = new byte[maxFrameSize]; + byte[] payload = new byte[maxFrameSize * 2]; for (int i = 0; i < payload.length; ++i) { payload[i] = 42; } @@ -104,7 +109,7 @@ public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { sender.send(message); - assertTrue("Connection should have failed", failed.await(10, TimeUnit.SECONDS)); + assertTrue("Connection should have failed", failed.await(30, TimeUnit.SECONDS)); assertNotNull(getProxyToQueue(getTestName())); assertEquals(0, getProxyToQueue(getTestName()).getQueueSize()); http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java index e15f86f..9a30660 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java @@ -56,6 +56,11 @@ public interface WSTransport extends Transport { } /** + * @return the maximum frame size allowed for this WS Transport. + */ + int getMaxFrameSize(); + + /** * @return the WS sub-protocol that this transport is supplying. */ String getSubProtocol(); http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java index 0ca80ef..d5a5207 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java @@ -218,6 +218,11 @@ public final class WSTransportProxy extends TransportSupport implements Transpor @Override public void onWebSocketConnect(Session session) { this.session = session; + + if (wsTransport.getMaxFrameSize() > 0) { + this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize()); + this.session.getPolicy().setMaxTextMessageSize(wsTransport.getMaxFrameSize()); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java index 3029668..ea8867d 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportServer.java @@ -145,9 +145,11 @@ public class WSTransportServer extends WebTransportServerSupport implements Brok @Override public void setTransportOption(Map<String, Object> transportOptions) { + // String transport from options and Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(transportOptions, "transport."); socketConnectorFactory.setTransportOptions(socketOptions); - super.setTransportOption(socketOptions); + transportOptions.putAll(socketOptions); + super.setTransportOption(transportOptions); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/2e2d5ddd/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java index 21754ad..8cb3811 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java @@ -112,7 +112,7 @@ public class WSServlet extends WebSocketServlet implements BrokerServiceAware { switch (requestedProtocol) { case MQTT: socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); - ((MQTTSocket) socket).setTransportOptions(new HashMap<String, Object>(transportOptions)); + ((MQTTSocket) socket).setTransportOptions(new HashMap<>(transportOptions)); ((MQTTSocket) socket).setPeerCertificates(req.getCertificates()); resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols, req.getSubProtocols(), "mqtt")); break;
