This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 03afbedfe3 ARTEMIS-4282 Large Header may break the broker
03afbedfe3 is described below
commit 03afbedfe3bcd23271dc857bf1548afbaaf60b21
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed May 17 15:54:03 2023 -0400
ARTEMIS-4282 Large Header may break the broker
---
.../artemis/core/journal/impl/JournalImpl.java | 9 ++-
.../core/server/impl/ServerSessionImpl.java | 13 +++-
.../integration/amqp/AmqpLargeMessageTest.java | 78 ++++++++++++++++++++++
3 files changed, 97 insertions(+), 3 deletions(-)
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 724c9695bc..6de092d432 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -1263,6 +1263,13 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
txID, id, recordType, record);
}
+ JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id,
recordType, persister, record);
+ int encodeSize = addRecord.getEncodeSize();
+
+ if (encodeSize > getMaxRecordSize()) {
+ //The record size should be larger than max record size only on the
large messages case.
+ throw
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize,
getMaxRecordSize());
+ }
appendExecutor.execute(new Runnable() {
@@ -1276,9 +1283,7 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
if (tx != null) {
tx.checkErrorCondition();
}
- JournalInternalRecord addRecord = new JournalAddRecordTX(true,
txID, id, recordType, persister, record);
// we need to calculate the encodeSize here, as it may use
caches that are eliminated once the record is written
- int encodeSize = addRecord.getEncodeSize();
JournalFile usedFile = appendRecord(addRecord, false, false,
tx, null);
if (logger.isTraceEnabled()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 5720c49820..6f7fb3c897 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -77,6 +77,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -1914,7 +1915,17 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
result = handleManagementMessage(tx, message, direct);
} else {
- result = doSend(tx, message, address, direct, senderName,
noAutoCreateQueue, routingContext);
+ try {
+ result = doSend(tx, message, address, direct, senderName,
noAutoCreateQueue, routingContext);
+ } catch (ActiveMQIOErrorException e) {
+ if (tx != null) {
+ tx.markAsRollbackOnly(e);
+ }
+ if (message.isLargeMessage()) {
+ ((LargeServerMessage)message).deleteFile();
+ }
+ throw e;
+ }
}
if (AuditLogger.isMessageLoggingEnabled()) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index 94bb09c7fa..b9e6aa863c 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -41,6 +42,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -49,6 +51,7 @@ import
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -521,6 +524,81 @@ public class AmqpLargeMessageTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ public void testLargeHeaderTXLargeBody() throws Exception {
+ Assume.assumeFalse(jdbc); // the checked rule with the property size
will not be applied to JDBC, hence we skip the test
+ testLargeHeaderTX(true);
+ }
+
+ @Test
+ public void testLargeHeaderTXSmallBody() throws Exception {
+ Assume.assumeFalse(jdbc); // the checked rule with the property size
will not be applied to JDBC, hence we skip the test
+ testLargeHeaderTX(false);
+ }
+
+ private void testLargeHeaderTX(boolean largeBody) throws Exception {
+ String testQueueName = RandomUtil.randomString();
+ server.createQueue(new
QueueConfiguration(testQueueName).setRoutingType(RoutingType.ANYCAST));
+ ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:5672");
+
+ String largeString;
+ {
+ StringBuffer buffer = new StringBuffer();
+ while (buffer.length() < 1024 * 1024) {
+ buffer.append("This is a large string ");
+ }
+ largeString = buffer.toString();
+ }
+
+ String smallString = "small string";
+
+ String body = largeBody ? largeString : smallString;
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(testQueueName));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ Message message = session.createTextMessage(body);
+ message.setStringProperty("test", largeString);
+ boolean failed = false;
+ try {
+ producer.send(message);
+ session.commit();
+ } catch (Exception expected) {
+ failed = true;
+ }
+ Assert.assertTrue(failed);
+ }
+
+ try (Connection connection = cf.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(testQueueName));
+
+ Message message = session.createTextMessage(body);
+ message.setStringProperty("test", smallString);
+ producer.send(message);
+ session.commit();
+
+ connection.start();
+
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(testQueueName));
+ TextMessage recMessage = (TextMessage) consumer.receive(5000);
+ Assert.assertEquals(smallString,
recMessage.getStringProperty("test"));
+ Assert.assertEquals(body, recMessage.getText());
+ session.commit();
+
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue =
server.locateQueue(testQueueName);
+ Wait.assertEquals(0, serverQueue::getMessageCount);
+
+ File largeMessageFolder =
server.getConfiguration().getLargeMessagesLocation();
+ File[] files = largeMessageFolder.listFiles();
+ Assert.assertTrue(files == null ? "Null Files" : "There are " +
files.length + " files in the large message folder", files == null ||
files.length == 0);
+ }
+
+
@Test(timeout = 60000)
public void testSendSmallerMessages() throws Exception {
for (int i = 512; i <= (8 * 1024); i += 512) {