This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b843e12852 ARTEMIS-2748 Support WebSocket compression in the transport
pipeline
b843e12852 is described below
commit b843e128522eb36168350c303fb89434e6102a26
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Oct 14 10:15:55 2024 -0400
ARTEMIS-2748 Support WebSocket compression in the transport pipeline
Adds support for WebSocket compression using the netty server handler to
enable per message compression and decompression as a transparent layer of
the netty pipeine.
---
.../remoting/impl/netty/TransportConstants.java | 9 ++
.../artemis/core/protocol/ProtocolHandler.java | 18 ++-
.../protocol/websocket/WebSocketServerHandler.java | 6 +-
.../websocket/WebSocketServerHandlerTest.java | 2 +-
.../amqp/AmqpWebSocketConnectionTest.java | 128 +++++++++++++++++++++
5 files changed, 158 insertions(+), 5 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 7859fec4a6..91ebe95cd9 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -351,6 +351,15 @@ public class TransportConstants {
public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10;
+ /*
+ * Defines if the WS acceptor allows a client to request compression via WS
extensions for
+ * per message deflate. By default this is not enabled and the WS upgrade
response will not
+ * carry any compression support headers when the client indicates it
supports compression.
+ */
+ public static final String WEB_SOCKET_COMPRESSION_SUPPORTED =
"webSocketCompressionSupported";
+
+ public static final boolean DEFAULT_WEB_SOCKET_COMPRESSION_SUPPORTED =
false;
+
public static final String QUIET_PERIOD = "quietPeriod";
public static final String DISABLE_STOMP_SERVER_HEADER =
"disableStompServerHeader";
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index 064c5023ee..e5efc05603 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -37,6 +37,8 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
+import
io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
+
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator;
@@ -152,15 +154,27 @@ public class ProtocolHandler {
FullHttpRequest request = (FullHttpRequest) msg;
HttpHeaders headers = request.headers();
String upgrade = headers.get("upgrade");
+
if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
int stompMaxFramePayloadLength =
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH,
-1, nettyAcceptor.getConfiguration());
if (stompMaxFramePayloadLength != -1) {
ActiveMQServerLogger.LOGGER.deprecatedConfigurationOption(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH,
TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH);
}
stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ?
stompMaxFramePayloadLength :
TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH;
+
int webSocketMaxFramePayloadLength =
ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH,
-1, nettyAcceptor.getConfiguration());
- String encoderConfigType =
ConfigurationHelper.getStringProperty(TransportConstants.WEB_SOCKET_ENCODER_TYPE,
TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE,
nettyAcceptor.getConfiguration());
- ctx.pipeline().addLast("websocket-handler", new
WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength
!= -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength,
WebSocketFrameEncoderType.valueOfType(encoderConfigType)));
+ webSocketMaxFramePayloadLength = webSocketMaxFramePayloadLength
!= -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength;
+
+ final boolean enableCompression =
ConfigurationHelper.getBooleanProperty(
+ TransportConstants.WEB_SOCKET_COMPRESSION_SUPPORTED,
TransportConstants.DEFAULT_WEB_SOCKET_COMPRESSION_SUPPORTED,
nettyAcceptor.getConfiguration());
+ final String encoderConfigType =
ConfigurationHelper.getStringProperty(
+ TransportConstants.WEB_SOCKET_ENCODER_TYPE,
TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE,
nettyAcceptor.getConfiguration());
+
+ if (enableCompression) {
+ ctx.pipeline().addLast(new
WebSocketServerCompressionHandler());
+ }
+
+ ctx.pipeline().addLast("websocket-handler", new
WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength,
WebSocketFrameEncoderType.valueOfType(encoderConfigType), enableCompression));
ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this);
ctx.pipeline().remove(HTTP_HANDLER);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
index e15fe21991..abde34b9fa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
@@ -51,12 +51,14 @@ public class WebSocketServerHandler extends
SimpleChannelInboundHandler<Object>
private WebSocketServerHandshaker handshaker;
private List<String> supportedProtocols;
private int maxFramePayloadLength;
+ private boolean allowExtensions;
private WebSocketFrameEncoderType encoderType;
- public WebSocketServerHandler(List<String> supportedProtocols, int
maxFramePayloadLength, WebSocketFrameEncoderType encoderType) {
+ public WebSocketServerHandler(List<String> supportedProtocols, int
maxFramePayloadLength, WebSocketFrameEncoderType encoderType, boolean
allowExtensions) {
this.supportedProtocols = supportedProtocols;
this.maxFramePayloadLength = maxFramePayloadLength;
this.encoderType = encoderType;
+ this.allowExtensions = allowExtensions;
}
@Override
@@ -81,7 +83,7 @@ public class WebSocketServerHandler extends
SimpleChannelInboundHandler<Object>
// Handshake
String supportedProtocolsCSV =
StringUtil.joinStringList(supportedProtocols, ",");
- WebSocketServerHandshakerFactory wsFactory = new
WebSocketServerHandshakerFactory(this.getWebSocketLocation(req),
supportedProtocolsCSV, false, maxFramePayloadLength);
+ WebSocketServerHandshakerFactory wsFactory = new
WebSocketServerHandshakerFactory(this.getWebSocketLocation(req),
supportedProtocolsCSV, allowExtensions, maxFramePayloadLength);
this.httpRequest = req;
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java
index f1f9644c12..f581f78f72 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java
@@ -49,7 +49,7 @@ public class WebSocketServerHandlerTest {
public void setup() throws Exception {
maxFramePayloadLength = 8192;
supportedProtocols = Arrays.asList("STOMP");
- spy = spy(new WebSocketServerHandler(supportedProtocols,
maxFramePayloadLength, WebSocketFrameEncoderType.BINARY));
+ spy = spy(new WebSocketServerHandler(supportedProtocols,
maxFramePayloadLength, WebSocketFrameEncoderType.BINARY, false));
}
@Test
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java
new file mode 100644
index 0000000000..d6a986b419
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
+import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClientOptions;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test connections via Web Sockets
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class AmqpWebSocketConnectionTest extends AmqpClientTestSupport {
+
+ @Parameter(index = 0)
+ public boolean supportWSCompression;
+
+ @Parameters(name = "supportWSCompression={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {{true}, {false}});
+ }
+
+ @Override
+ protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
+ tc.getParams().put("webSocketCompressionSupported",
supportWSCompression);
+ }
+
+ @TestTemplate
+ public void testClientConnectsWithWebSocketCompressionOn() throws Exception
{
+ testClientConnectsWithWebSockets(true);
+ }
+
+ @TestTemplate
+ public void testClientConnectsWithWebSocketCompressionOff() throws
Exception {
+ testClientConnectsWithWebSockets(false);
+ }
+
+ private void testClientConnectsWithWebSockets(boolean
clientAsksForCompression) throws Exception {
+ final ProtonTestClientOptions clientOpts = new ProtonTestClientOptions();
+
+ clientOpts.setUseWebSockets(true);
+ clientOpts.setWebSocketCompression(clientAsksForCompression);
+
+ try (ProtonTestClient client = new ProtonTestClient(clientOpts)) {
+ client.queueClientSaslAnonymousConnect();
+ client.remoteOpen().queue();
+ client.expectOpen();
+ client.remoteBegin().queue();
+ client.expectBegin();
+ client.connect("localhost", AMQP_PORT);
+
+ client.waitForScriptToComplete(5, TimeUnit.MINUTES);
+
+ if (clientAsksForCompression && supportWSCompression) {
+ assertTrue(client.isWSCompressionActive());
+ } else {
+ assertFalse(client.isWSCompressionActive());
+ }
+
+ client.expectAttach().ofSender();
+ client.expectAttach().ofReceiver();
+ client.expectFlow();
+
+ // Attach a sender and receiver
+ client.remoteAttach().ofReceiver()
+ .withName("ws-compression-test")
+ .withSource().withAddress(getQueueName())
+ .withCapabilities("queue").also()
+ .withTarget().and()
+ .now();
+ client.remoteFlow().withLinkCredit(10).now();
+ client.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("ws-compression-test")
+ .withTarget().withAddress(getQueueName())
+ .withCapabilities("queue").also()
+ .withSource().and()
+ .now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final String payload = "test-data:" + "A".repeat(1000);
+
+ // Broker sends message to subscription and acknowledges to sender
+ client.expectTransfer().withMessage().withValue(payload);
+ client.expectDisposition().withSettled(true).withState().accepted();
+
+ // Client sends message to queue with subscription
+ client.remoteTransfer().withDeliveryId(0)
+ .withBody().withValue(payload).also()
+ .now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ client.expectClose();
+ client.remoteClose().now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.close();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact