This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b843e12852 ARTEMIS-2748 Support WebSocket compression in the transport 
pipeline
b843e12852 is described below

commit b843e128522eb36168350c303fb89434e6102a26
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Oct 14 10:15:55 2024 -0400

    ARTEMIS-2748 Support WebSocket compression in the transport pipeline
    
    Adds support for WebSocket compression using the netty server handler to
    enable per message compression and decompression as a transparent layer of
    the netty pipeine.
---
 .../remoting/impl/netty/TransportConstants.java    |   9 ++
 .../artemis/core/protocol/ProtocolHandler.java     |  18 ++-
 .../protocol/websocket/WebSocketServerHandler.java |   6 +-
 .../websocket/WebSocketServerHandlerTest.java      |   2 +-
 .../amqp/AmqpWebSocketConnectionTest.java          | 128 +++++++++++++++++++++
 5 files changed, 158 insertions(+), 5 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 7859fec4a6..91ebe95cd9 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -351,6 +351,15 @@ public class TransportConstants {
 
    public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10;
 
+   /*
+    * Defines if the WS acceptor allows a client to request compression via WS 
extensions for
+    * per message deflate. By default this is not enabled and the WS upgrade 
response will not
+    * carry any compression support headers when the client indicates it 
supports compression.
+    */
+   public static final String WEB_SOCKET_COMPRESSION_SUPPORTED = 
"webSocketCompressionSupported";
+
+   public static final boolean DEFAULT_WEB_SOCKET_COMPRESSION_SUPPORTED = 
false;
+
    public static final String QUIET_PERIOD = "quietPeriod";
 
    public static final String DISABLE_STOMP_SERVER_HEADER = 
"disableStompServerHeader";
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index 064c5023ee..e5efc05603 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -37,6 +37,8 @@ import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
+import 
io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
+
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator;
@@ -152,15 +154,27 @@ public class ProtocolHandler {
             FullHttpRequest request = (FullHttpRequest) msg;
             HttpHeaders headers = request.headers();
             String upgrade = headers.get("upgrade");
+
             if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
                int stompMaxFramePayloadLength = 
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH,
 -1, nettyAcceptor.getConfiguration());
                if (stompMaxFramePayloadLength != -1) {
                   
ActiveMQServerLogger.LOGGER.deprecatedConfigurationOption(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH,
 TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH);
                }
                stompMaxFramePayloadLength = stompMaxFramePayloadLength != -1 ? 
stompMaxFramePayloadLength : 
TransportConstants.DEFAULT_WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH;
+
                int webSocketMaxFramePayloadLength = 
ConfigurationHelper.getIntProperty(TransportConstants.WEB_SOCKET_MAX_FRAME_PAYLOAD_LENGTH,
 -1, nettyAcceptor.getConfiguration());
-               String encoderConfigType = 
ConfigurationHelper.getStringProperty(TransportConstants.WEB_SOCKET_ENCODER_TYPE,
 TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE, 
nettyAcceptor.getConfiguration());
-               ctx.pipeline().addLast("websocket-handler", new 
WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength 
!= -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength, 
WebSocketFrameEncoderType.valueOfType(encoderConfigType)));
+               webSocketMaxFramePayloadLength = webSocketMaxFramePayloadLength 
!= -1 ? webSocketMaxFramePayloadLength : stompMaxFramePayloadLength;
+
+               final boolean enableCompression = 
ConfigurationHelper.getBooleanProperty(
+                  TransportConstants.WEB_SOCKET_COMPRESSION_SUPPORTED, 
TransportConstants.DEFAULT_WEB_SOCKET_COMPRESSION_SUPPORTED, 
nettyAcceptor.getConfiguration());
+               final String encoderConfigType = 
ConfigurationHelper.getStringProperty(
+                  TransportConstants.WEB_SOCKET_ENCODER_TYPE, 
TransportConstants.DEFAULT_WEB_SOCKET_ENCODER_TYPE, 
nettyAcceptor.getConfiguration());
+
+               if (enableCompression) {
+                  ctx.pipeline().addLast(new 
WebSocketServerCompressionHandler());
+               }
+
+               ctx.pipeline().addLast("websocket-handler", new 
WebSocketServerHandler(websocketSubprotocolIds, webSocketMaxFramePayloadLength, 
WebSocketFrameEncoderType.valueOfType(encoderConfigType), enableCompression));
                ctx.pipeline().addLast(new ProtocolDecoder(false, false));
                ctx.pipeline().remove(this);
                ctx.pipeline().remove(HTTP_HANDLER);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
index e15fe21991..abde34b9fa 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
@@ -51,12 +51,14 @@ public class WebSocketServerHandler extends 
SimpleChannelInboundHandler<Object>
    private WebSocketServerHandshaker handshaker;
    private List<String> supportedProtocols;
    private int maxFramePayloadLength;
+   private boolean allowExtensions;
    private WebSocketFrameEncoderType encoderType;
 
-   public WebSocketServerHandler(List<String> supportedProtocols, int 
maxFramePayloadLength, WebSocketFrameEncoderType encoderType) {
+   public WebSocketServerHandler(List<String> supportedProtocols, int 
maxFramePayloadLength, WebSocketFrameEncoderType encoderType, boolean 
allowExtensions) {
       this.supportedProtocols = supportedProtocols;
       this.maxFramePayloadLength = maxFramePayloadLength;
       this.encoderType = encoderType;
+      this.allowExtensions = allowExtensions;
    }
 
    @Override
@@ -81,7 +83,7 @@ public class WebSocketServerHandler extends 
SimpleChannelInboundHandler<Object>
 
       // Handshake
       String supportedProtocolsCSV = 
StringUtil.joinStringList(supportedProtocols, ",");
-      WebSocketServerHandshakerFactory wsFactory = new 
WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), 
supportedProtocolsCSV, false, maxFramePayloadLength);
+      WebSocketServerHandshakerFactory wsFactory = new 
WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), 
supportedProtocolsCSV, allowExtensions, maxFramePayloadLength);
       this.httpRequest = req;
       this.handshaker = wsFactory.newHandshaker(req);
       if (this.handshaker == null) {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java
index f1f9644c12..f581f78f72 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandlerTest.java
@@ -49,7 +49,7 @@ public class WebSocketServerHandlerTest {
    public void setup() throws Exception {
       maxFramePayloadLength = 8192;
       supportedProtocols = Arrays.asList("STOMP");
-      spy = spy(new WebSocketServerHandler(supportedProtocols, 
maxFramePayloadLength, WebSocketFrameEncoderType.BINARY));
+      spy = spy(new WebSocketServerHandler(supportedProtocols, 
maxFramePayloadLength, WebSocketFrameEncoderType.BINARY, false));
    }
 
    @Test
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java
new file mode 100644
index 0000000000..d6a986b419
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpWebSocketConnectionTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
+import 
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClientOptions;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Test connections via Web Sockets
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class AmqpWebSocketConnectionTest extends AmqpClientTestSupport {
+
+   @Parameter(index = 0)
+   public boolean supportWSCompression;
+
+   @Parameters(name = "supportWSCompression={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {{true}, {false}});
+   }
+
+   @Override
+   protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
+      tc.getParams().put("webSocketCompressionSupported", 
supportWSCompression);
+   }
+
+   @TestTemplate
+   public void testClientConnectsWithWebSocketCompressionOn() throws Exception 
{
+      testClientConnectsWithWebSockets(true);
+   }
+
+   @TestTemplate
+   public void testClientConnectsWithWebSocketCompressionOff() throws 
Exception {
+      testClientConnectsWithWebSockets(false);
+   }
+
+   private void testClientConnectsWithWebSockets(boolean 
clientAsksForCompression) throws Exception {
+      final ProtonTestClientOptions clientOpts = new ProtonTestClientOptions();
+
+      clientOpts.setUseWebSockets(true);
+      clientOpts.setWebSocketCompression(clientAsksForCompression);
+
+      try (ProtonTestClient client = new ProtonTestClient(clientOpts)) {
+         client.queueClientSaslAnonymousConnect();
+         client.remoteOpen().queue();
+         client.expectOpen();
+         client.remoteBegin().queue();
+         client.expectBegin();
+         client.connect("localhost", AMQP_PORT);
+
+         client.waitForScriptToComplete(5, TimeUnit.MINUTES);
+
+         if (clientAsksForCompression && supportWSCompression) {
+            assertTrue(client.isWSCompressionActive());
+         } else {
+            assertFalse(client.isWSCompressionActive());
+         }
+
+         client.expectAttach().ofSender();
+         client.expectAttach().ofReceiver();
+         client.expectFlow();
+
+         // Attach a sender and receiver
+         client.remoteAttach().ofReceiver()
+                              .withName("ws-compression-test")
+                              .withSource().withAddress(getQueueName())
+                                           .withCapabilities("queue").also()
+                              .withTarget().and()
+                              .now();
+         client.remoteFlow().withLinkCredit(10).now();
+         client.remoteAttach().ofSender()
+                              .withInitialDeliveryCount(0)
+                              .withName("ws-compression-test")
+                              .withTarget().withAddress(getQueueName())
+                                           .withCapabilities("queue").also()
+                              .withSource().and()
+                              .now();
+
+         client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final String payload = "test-data:" + "A".repeat(1000);
+
+         // Broker sends message to subscription and acknowledges to sender
+         client.expectTransfer().withMessage().withValue(payload);
+         client.expectDisposition().withSettled(true).withState().accepted();
+
+         // Client sends message to queue with subscription
+         client.remoteTransfer().withDeliveryId(0)
+                                .withBody().withValue(payload).also()
+                                .now();
+
+         client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         client.expectClose();
+         client.remoteClose().now();
+
+         client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         client.close();
+      }
+   }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to