This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 03afbedfe3 ARTEMIS-4282 Large Header may break the broker
03afbedfe3 is described below

commit 03afbedfe3bcd23271dc857bf1548afbaaf60b21
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed May 17 15:54:03 2023 -0400

    ARTEMIS-4282 Large Header may break the broker
---
 .../artemis/core/journal/impl/JournalImpl.java     |  9 ++-
 .../core/server/impl/ServerSessionImpl.java        | 13 +++-
 .../integration/amqp/AmqpLargeMessageTest.java     | 78 ++++++++++++++++++++++
 3 files changed, 97 insertions(+), 3 deletions(-)

diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 724c9695bc..6de092d432 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -1263,6 +1263,13 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                       txID, id, recordType, record);
       }
 
+      JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, 
recordType, persister, record);
+      int encodeSize = addRecord.getEncodeSize();
+
+      if (encodeSize > getMaxRecordSize()) {
+         //The record size should be larger than max record size only on the 
large messages case.
+         throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize, 
getMaxRecordSize());
+      }
 
       appendExecutor.execute(new Runnable() {
 
@@ -1276,9 +1283,7 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                if (tx != null) {
                   tx.checkErrorCondition();
                }
-               JournalInternalRecord addRecord = new JournalAddRecordTX(true, 
txID, id, recordType, persister, record);
                // we need to calculate the encodeSize here, as it may use 
caches that are eliminated once the record is written
-               int encodeSize = addRecord.getEncodeSize();
                JournalFile usedFile = appendRecord(addRecord, false, false, 
tx, null);
 
                if (logger.isTraceEnabled()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 5720c49820..6f7fb3c897 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -77,6 +77,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
@@ -1914,7 +1915,17 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
             result = handleManagementMessage(tx, message, direct);
          } else {
-            result = doSend(tx, message, address, direct, senderName, 
noAutoCreateQueue, routingContext);
+            try {
+               result = doSend(tx, message, address, direct, senderName, 
noAutoCreateQueue, routingContext);
+            } catch (ActiveMQIOErrorException e) {
+               if (tx != null) {
+                  tx.markAsRollbackOnly(e);
+               }
+               if (message.isLargeMessage()) {
+                  ((LargeServerMessage)message).deleteFile();
+               }
+               throw e;
+            }
          }
 
          if (AuditLogger.isMessageLoggingEnabled()) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index 94bb09c7fa..b9e6aa863c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
@@ -41,6 +42,7 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.message.LargeBodyReader;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -49,6 +51,7 @@ import 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
 import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -521,6 +524,81 @@ public class AmqpLargeMessageTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   public void testLargeHeaderTXLargeBody() throws Exception {
+      Assume.assumeFalse(jdbc); // the checked rule with the property size 
will not be applied to JDBC, hence we skip the test
+      testLargeHeaderTX(true);
+   }
+
+   @Test
+   public void testLargeHeaderTXSmallBody() throws Exception {
+      Assume.assumeFalse(jdbc); // the checked rule with the property size 
will not be applied to JDBC, hence we skip the test
+      testLargeHeaderTX(false);
+   }
+
+   private void testLargeHeaderTX(boolean largeBody) throws Exception {
+      String testQueueName = RandomUtil.randomString();
+      server.createQueue(new 
QueueConfiguration(testQueueName).setRoutingType(RoutingType.ANYCAST));
+      ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:5672");
+
+      String largeString;
+      {
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < 1024 * 1024) {
+            buffer.append("This is a large string ");
+         }
+         largeString = buffer.toString();
+      }
+
+      String smallString = "small string";
+
+      String body = largeBody ? largeString : smallString;
+
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(testQueueName));
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         Message message = session.createTextMessage(body);
+         message.setStringProperty("test", largeString);
+         boolean failed = false;
+         try {
+            producer.send(message);
+            session.commit();
+         } catch (Exception expected) {
+            failed = true;
+         }
+         Assert.assertTrue(failed);
+      }
+
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(testQueueName));
+
+         Message message = session.createTextMessage(body);
+         message.setStringProperty("test", smallString);
+         producer.send(message);
+         session.commit();
+
+         connection.start();
+
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(testQueueName));
+         TextMessage recMessage =  (TextMessage) consumer.receive(5000);
+         Assert.assertEquals(smallString, 
recMessage.getStringProperty("test"));
+         Assert.assertEquals(body, recMessage.getText());
+         session.commit();
+
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(testQueueName);
+      Wait.assertEquals(0, serverQueue::getMessageCount);
+
+      File largeMessageFolder = 
server.getConfiguration().getLargeMessagesLocation();
+      File[] files = largeMessageFolder.listFiles();
+      Assert.assertTrue(files == null ? "Null Files" : "There are " + 
files.length + " files in the large message folder", files == null || 
files.length == 0);
+   }
+
+
    @Test(timeout = 60000)
    public void testSendSmallerMessages() throws Exception {
       for (int i = 512; i <= (8 * 1024); i += 512) {

Reply via email to