This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new e7ed4700e1 ARTEMIS-5054 Fix concurrent access issue of large message
to Stomp Frame
e7ed4700e1 is described below
commit e7ed4700e14777ccae4e570b5d3fbe6a918b9d14
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Sep 25 08:24:03 2024 -0400
ARTEMIS-5054 Fix concurrent access issue of large message to Stomp Frame
When converting a large server message to an outgoing STOMP frame the
converter
is allowing unsafe concurrent access to the large message internals which
leads
to failures on message deliver as the state is out of sync amongst the
dispatch
threads.
---
.../core/protocol/stomp/StompConnection.java | 2 +-
.../protocol/stomp/VersionedStompFrameHandler.java | 76 ++++++++++++++++++----
.../protocol/stomp/v12/StompFrameHandlerV12.java | 3 +-
.../artemis/tests/integration/stomp/StompTest.java | 50 ++++++++++++++
4 files changed, 116 insertions(+), 15 deletions(-)
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 742c2a8d79..bf0d09f955 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -626,7 +626,7 @@ public final class StompConnection extends
AbstractRemotingConnection {
public StompFrame createStompMessage(ICoreMessage message,
StompSubscription subscription,
ServerConsumer consumer,
- int deliveryCount) {
+ int deliveryCount) throws
ActiveMQException {
return frameHandler.createMessageFrame(message, subscription, consumer,
deliveryCount);
}
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 77b6c78110..b338456b9e 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -16,14 +16,18 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp.Headers;
import
org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
@@ -36,6 +40,8 @@ import static
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
public abstract class VersionedStompFrameHandler {
+ protected static byte[] EMPTY_BODY = new byte[0];
+
protected StompConnection connection;
protected StompDecoder decoder;
@@ -324,34 +330,79 @@ public abstract class VersionedStompFrameHandler {
public StompFrame createMessageFrame(ICoreMessage serverMessage,
StompSubscription subscription,
ServerConsumer consumer,
- int deliveryCount) {
- StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
+ int deliveryCount) throws
ActiveMQException {
+ final StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
if (subscription.getID() != null) {
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION,
subscription.getID());
}
- ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
+ if (serverMessage.isLargeMessage()) {
+ populateFrameBodyFromLargeMessage(frame, serverMessage);
+ } else {
+ populateFrameBodyFromMessage(frame, serverMessage);
+ }
+
+ frame.addHeader(Stomp.Headers.Message.MESSAGE_ID, new
StringBuilder(41).append(consumer.getID()).append(StompSession.MESSAGE_ID_SEPARATOR).append(serverMessage.getMessageID()).toString());
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
+
+ return frame;
+ }
- byte[] data = new byte[buffer.writerIndex()];
+ private void populateFrameBodyFromMessage(StompFrame frame, ICoreMessage
serverMessage) {
+ final ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
+ final int bodyLength = buffer.readableBytes();
- if (data.length > 0) {
+ if (bodyLength > 0) {
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE) {
- frame.addHeader(Headers.CONTENT_LENGTH,
String.valueOf(data.length));
+ final byte[] data = new byte[bodyLength];
+
buffer.readBytes(data);
+
+ frame.addHeader(Headers.CONTENT_LENGTH,
String.valueOf(bodyLength));
+ frame.setByteBody(data);
} else {
- SimpleString text = buffer.readNullableSimpleString();
+ final SimpleString text = buffer.readNullableSimpleString();
+
if (text != null) {
- data = text.toString().getBytes(StandardCharsets.UTF_8);
+
frame.setByteBody(text.toString().getBytes(StandardCharsets.UTF_8));
}
}
+ } else {
+ frame.setByteBody(EMPTY_BODY);
}
- frame.setByteBody(data);
+ }
- frame.addHeader(Stomp.Headers.Message.MESSAGE_ID, new
StringBuilder(41).append(consumer.getID()).append(StompSession.MESSAGE_ID_SEPARATOR).append(serverMessage.getMessageID()).toString());
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
+ private void populateFrameBodyFromLargeMessage(StompFrame frame,
ICoreMessage serverMessage) throws ActiveMQException {
+ try (LargeBodyReader reader = serverMessage.getLargeBodyReader()) {
+ reader.open();
- return frame;
+ final int bodyLength = (int) reader.getSize();
+
+ if (bodyLength > 0) {
+ final byte[] bodyBytes = new byte[bodyLength];
+ final ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyBytes);
+
+ reader.readInto(bodyBuffer);
+
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH)
|| serverMessage.getType() == Message.BYTES_TYPE) {
+ frame.addHeader(Headers.CONTENT_LENGTH,
String.valueOf(bodyLength));
+ frame.setByteBody(bodyBytes);
+ } else {
+ final ActiveMQBuffer buffer =
ActiveMQBuffers.wrappedBuffer(bodyBuffer);
+
+ buffer.writerIndex(bodyLength);
+
+ final SimpleString text = buffer.readNullableSimpleString();
+
+ if (text != null) {
+
frame.setByteBody(text.toString().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ } else {
+ frame.setByteBody(EMPTY_BODY);
+ }
+ }
}
/**
@@ -391,5 +442,4 @@ public abstract class VersionedStompFrameHandler {
response.addHeader(Stomp.Headers.Error.MESSAGE, responseText);
return response;
}
-
}
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 6ee5fe382c..9c3d43ad41 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@@ -52,7 +53,7 @@ public class StompFrameHandlerV12 extends
StompFrameHandlerV11 {
public StompFrame createMessageFrame(ICoreMessage serverMessage,
StompSubscription subscription,
ServerConsumer consumer,
- int deliveryCount) {
+ int deliveryCount) throws
ActiveMQException {
StompFrame frame = super.createMessageFrame(serverMessage, subscription,
consumer, deliveryCount);
if
(!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 02346976ef..c4c25dcb83 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -2120,4 +2120,54 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
+ @Test
+ public void testMultipleSubscriptionsOnMulticastAddressReadSameMessage()
throws Exception {
+ doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(10);
+ }
+
+ @Test
+ public void
testMultipleSubscriptionsOnMulticastAddressReadSameLargeMessage() throws
Exception {
+ doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(120_000);
+ }
+
+ private void
doTestMultipleSubscriptionsOnMulticastAddressReadSameMessage(int size) throws
Exception {
+ final String body = "A".repeat(size);
+
+ final StompClientConnection conn_r1 =
StompClientConnectionFactory.createClientConnection(uri);
+ final StompClientConnection conn_r2 =
StompClientConnectionFactory.createClientConnection(uri);
+
+ try {
+ conn_r1.connect(defUser, defPass);
+ subscribeTopic(conn_r1, null, null, null);
+
+ conn_r2.connect(defUser, defPass);
+ subscribeTopic(conn_r2, null, null, null);
+
+ // Sender
+ conn.connect(defUser, defPass);
+ send(conn, getTopicPrefix() + getTopicName(), null, body, true,
RoutingType.MULTICAST);
+
+ ClientStompFrame frame1 = conn_r1.receiveFrame(10000);
+ ClientStompFrame frame2 = conn_r2.receiveFrame(10000);
+
+ assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
+
+ assertEquals(getTopicPrefix() + getTopicName(),
frame2.getHeader(Stomp.Headers.Send.DESTINATION));
+ assertEquals(getTopicPrefix() + getTopicName(),
frame1.getHeader(Stomp.Headers.Send.DESTINATION));
+
+ assertEquals(RoutingType.MULTICAST.toString(),
frame2.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ assertEquals(RoutingType.MULTICAST.toString(),
frame1.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+
+
assertNull(frame2.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString()));
+
assertNull(frame1.getHeader(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE.toString()));
+
+ assertEquals(body, frame2.getBody());
+ assertEquals(body, frame1.getBody());
+ } finally {
+ conn.disconnect();
+ conn_r1.disconnect();
+ conn_r2.disconnect();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact