This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new e7ed4700e1 ARTEMIS-5054 Fix concurrent access issue of large message 
to Stomp Frame
e7ed4700e1 is described below

commit e7ed4700e14777ccae4e570b5d3fbe6a918b9d14
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Sep 25 08:24:03 2024 -0400

    ARTEMIS-5054 Fix concurrent access issue of large message to Stomp Frame
    
    When converting a large server message to an outgoing STOMP frame the 
converter
    is allowing unsafe concurrent access to the large message internals which 
leads
    to failures on message deliver as the state is out of sync amongst the 
dispatch
    threads.
---
 .../core/protocol/stomp/StompConnection.java       |  2 +-
 .../protocol/stomp/VersionedStompFrameHandler.java | 76 ++++++++++++++++++----
 .../protocol/stomp/v12/StompFrameHandlerV12.java   |  3 +-
 .../artemis/tests/integration/stomp/StompTest.java | 50 ++++++++++++++
 4 files changed, 116 insertions(+), 15 deletions(-)

diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 742c2a8d79..bf0d09f955 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -626,7 +626,7 @@ public final class StompConnection extends 
AbstractRemotingConnection {
    public StompFrame createStompMessage(ICoreMessage message,
                                         StompSubscription subscription,
                                         ServerConsumer consumer,
-                                        int deliveryCount) {
+                                        int deliveryCount) throws 
ActiveMQException {
       return frameHandler.createMessageFrame(message, subscription, consumer, 
deliveryCount);
    }
 
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 77b6c78110..b338456b9e 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -16,14 +16,18 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.LargeBodyReader;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
 import 
org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
@@ -36,6 +40,8 @@ import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
 
 public abstract class VersionedStompFrameHandler {
 
+   protected static byte[] EMPTY_BODY = new byte[0];
+
    protected StompConnection connection;
    protected StompDecoder decoder;
 
@@ -324,34 +330,79 @@ public abstract class VersionedStompFrameHandler {
    public StompFrame createMessageFrame(ICoreMessage serverMessage,
                                         StompSubscription subscription,
                                         ServerConsumer consumer,
-                                        int deliveryCount) {
-      StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
+                                        int deliveryCount) throws 
ActiveMQException {
+      final StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
 
       if (subscription.getID() != null) {
          frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, 
subscription.getID());
       }
 
-      ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
+      if (serverMessage.isLargeMessage()) {
+         populateFrameBodyFromLargeMessage(frame, serverMessage);
+      } else {
+         populateFrameBodyFromMessage(frame, serverMessage);
+      }
+
+      frame.addHeader(Stomp.Headers.Message.MESSAGE_ID, new 
StringBuilder(41).append(consumer.getID()).append(StompSession.MESSAGE_ID_SEPARATOR).append(serverMessage.getMessageID()).toString());
+      StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, 
deliveryCount);
+
+      return frame;
+   }
 
-      byte[] data = new byte[buffer.writerIndex()];
+   private void populateFrameBodyFromMessage(StompFrame frame, ICoreMessage 
serverMessage) {
+      final ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
+      final int bodyLength = buffer.readableBytes();
 
-      if (data.length > 0) {
+      if (bodyLength > 0) {
          if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || 
serverMessage.getType() == Message.BYTES_TYPE) {
-            frame.addHeader(Headers.CONTENT_LENGTH, 
String.valueOf(data.length));
+            final byte[] data = new byte[bodyLength];
+
             buffer.readBytes(data);
+
+            frame.addHeader(Headers.CONTENT_LENGTH, 
String.valueOf(bodyLength));
+            frame.setByteBody(data);
          } else {
-            SimpleString text = buffer.readNullableSimpleString();
+            final SimpleString text = buffer.readNullableSimpleString();
+
             if (text != null) {
-               data = text.toString().getBytes(StandardCharsets.UTF_8);
+               
frame.setByteBody(text.toString().getBytes(StandardCharsets.UTF_8));
             }
          }
+      } else {
+         frame.setByteBody(EMPTY_BODY);
       }
-      frame.setByteBody(data);
+   }
 
-      frame.addHeader(Stomp.Headers.Message.MESSAGE_ID, new 
StringBuilder(41).append(consumer.getID()).append(StompSession.MESSAGE_ID_SEPARATOR).append(serverMessage.getMessageID()).toString());
-      StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, 
deliveryCount);
+   private void populateFrameBodyFromLargeMessage(StompFrame frame, 
ICoreMessage serverMessage) throws ActiveMQException {
+      try (LargeBodyReader reader = serverMessage.getLargeBodyReader()) {
+         reader.open();
 
-      return frame;
+         final int bodyLength = (int) reader.getSize();
+
+         if (bodyLength > 0) {
+            final byte[] bodyBytes = new byte[bodyLength];
+            final ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyBytes);
+
+            reader.readInto(bodyBuffer);
+
+            if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) 
|| serverMessage.getType() == Message.BYTES_TYPE) {
+               frame.addHeader(Headers.CONTENT_LENGTH, 
String.valueOf(bodyLength));
+               frame.setByteBody(bodyBytes);
+            } else {
+               final ActiveMQBuffer buffer = 
ActiveMQBuffers.wrappedBuffer(bodyBuffer);
+
+               buffer.writerIndex(bodyLength);
+
+               final SimpleString text = buffer.readNullableSimpleString();
+
+               if (text != null) {
+                  
frame.setByteBody(text.toString().getBytes(StandardCharsets.UTF_8));
+               }
+            }
+         } else {
+            frame.setByteBody(EMPTY_BODY);
+         }
+      }
    }
 
    /**
@@ -391,5 +442,4 @@ public abstract class VersionedStompFrameHandler {
       response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
       return response;
    }
-
 }
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 6ee5fe382c..9c3d43ad41 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@@ -52,7 +53,7 @@ public class StompFrameHandlerV12 extends 
StompFrameHandlerV11 {
    public StompFrame createMessageFrame(ICoreMessage serverMessage,
                                         StompSubscription subscription,
                                         ServerConsumer consumer,
-                                        int deliveryCount) {
+                                        int deliveryCount) throws 
ActiveMQException {
       StompFrame frame = super.createMessageFrame(serverMessage, subscription, 
consumer, deliveryCount);
 
       if 
(!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 02346976ef..c4c25dcb83 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -2120,4 +2120,54 @@ public class StompTest extends StompTestBase {
       conn.disconnect();
    }
 
+   @Test
+   public void testMultipleSubscriptionsOnMulticastAddressReadSameMessage() 
throws Exception {
+      doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(10);
+   }
+
+   @Test
+   public void 
testMultipleSubscriptionsOnMulticastAddressReadSameLargeMessage() throws 
Exception {
+      doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(120_000);
+   }
+
+   private void 
doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(int size) throws 
Exception {
+      final String body = "A".repeat(size);
+
+      final StompClientConnection conn_r1 = 
StompClientConnectionFactory.createClientConnection(uri);
+      final StompClientConnection conn_r2 = 
StompClientConnectionFactory.createClientConnection(uri);
+
+      try {
+         conn_r1.connect(defUser, defPass);
+         subscribeTopic(conn_r1, null, null, null);
+
+         conn_r2.connect(defUser, defPass);
+         subscribeTopic(conn_r2, null, null, null);
+
+         // Sender
+         conn.connect(defUser, defPass);
+         send(conn, getTopicPrefix() + getTopicName(), null, body, true, 
RoutingType.MULTICAST);
+
+         ClientStompFrame frame1 = conn_r1.receiveFrame(10000);
+         ClientStompFrame frame2 = conn_r2.receiveFrame(10000);
+
+         assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
+         assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
+
+         assertEquals(getTopicPrefix() + getTopicName(), 
frame2.getHeader(Stomp.Headers.Send.DESTINATION));
+         assertEquals(getTopicPrefix() + getTopicName(), 
frame1.getHeader(Stomp.Headers.Send.DESTINATION));
+
+         assertEquals(RoutingType.MULTICAST.toString(), 
frame2.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+         assertEquals(RoutingType.MULTICAST.toString(), 
frame1.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+
+         
assertNull(frame2.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString()));
+         
assertNull(frame1.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString()));
+
+         assertEquals(body, frame2.getBody());
+         assertEquals(body, frame1.getBody());
+      } finally {
+         conn.disconnect();
+         conn_r1.disconnect();
+         conn_r2.disconnect();
+      }
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to