This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new bfb408f590 ARTEMIS-4993 deal w/messages that become too large via 
divert transformer
bfb408f590 is described below

commit bfb408f5905ad9ef5f0375d466894c276b09cca5
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Apr 30 14:17:31 2025 -0500

    ARTEMIS-4993 deal w/messages that become too large via divert transformer
---
 .../artemis/core/server/impl/DivertImpl.java       |  6 +-
 .../tests/integration/divert/DivertTest.java       | 77 ++++++++++++++++++++--
 2 files changed, 74 insertions(+), 9 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 8890e400e4..526971d54b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.lang.invoke.MethodHandles;
+
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.Divert;
@@ -28,7 +31,6 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 /**
  * A DivertImpl simply diverts a message to a different forwardAddress
@@ -142,7 +144,7 @@ public class DivertImpl implements Divert {
             }
 
             if (transformer != null) {
-               copy = transformer.transform(copy);
+               copy = 
LargeServerMessageImpl.checkLargeMessage(transformer.transform(copy), 
this.storageManager);
             }
 
             // We call reencode at the end only, in a single call.
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index f5bfc9082a..c6d6199b79 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.divert;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.fail;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.MessageConsumer;
@@ -33,6 +27,8 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -49,29 +45,41 @@ import 
org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.Divert;
+import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
+import 
org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
-import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class DivertTest extends ActiveMQTestBase {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -1779,6 +1787,61 @@ public class DivertTest extends ActiveMQTestBase {
       assertEquals("testAddress" + (COUNT - 1), 
message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
    }
 
+   @Test
+   public void testTransformedToLarge() throws Exception {
+      final String address = "address";
+      final SimpleString queue = SimpleString.of("queue");
+      final String forwardingAddress = "forwardingAddress";
+      final SimpleString forwardingQueue = SimpleString.of("forwardingQueue");
+      final int journalBufferSize = 1024;
+      final int headerCount = 5;
+
+      ActiveMQServer server = 
addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setJournalBufferSize_NIO(journalBufferSize).setJournalType(JournalType.NIO),
 true));
+      server.start();
+
+      // configure the transformer to add headers to increase the size of the 
message past the journal buffer size
+      Map<String, String> headers = new HashMap<>();
+      for (int i = 0; i < headerCount; i++) {
+         headers.put(RandomUtil.randomAlphaNumericString(32), 
RandomUtil.randomAlphaNumericString(32));
+      }
+      TransformerConfiguration transformerConfiguration = new 
TransformerConfiguration();
+      
transformerConfiguration.setClassName(AddHeadersTransformer.class.getName());
+      transformerConfiguration.setProperties(headers);
+
+      
server.createQueue(QueueConfiguration.of(queue).setAddress(address).setRoutingType(RoutingType.ANYCAST));
+      
server.createQueue(QueueConfiguration.of(forwardingQueue).setAddress(forwardingAddress).setRoutingType(RoutingType.ANYCAST));
+      server.deployDivert(new DivertConfiguration()
+                             .setName("divert")
+                             .setAddress(address)
+                             .setForwardingAddress(forwardingAddress)
+                             
.setTransformerConfiguration(transformerConfiguration));
+
+      ServerLocator locator = createInVMNonHALocator();
+      locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, true, true);
+      session.start();
+
+      ClientProducer producer = 
session.createProducer(SimpleString.of(address));
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientConsumer divertedConsumer = 
session.createConsumer(forwardingQueue);
+      ClientMessage message = session.createMessage(true);
+      
message.getBodyBuffer().writeBytes(RandomUtil.randomBytes(journalBufferSize / 
4));
+      producer.send(message);
+
+      // ensure the non-diverted message is not turned into a large message
+      message = consumer.receive(DivertTest.TIMEOUT);
+      assertNotNull(message);
+      message.acknowledge();
+      assertFalse(message instanceof ClientLargeMessageImpl);
+
+      // ensure the diverted message is turned into a large message
+      message = divertedConsumer.receive(DivertTest.TIMEOUT);
+      assertNotNull(message);
+      message.acknowledge();
+      assertTrue(message instanceof ClientLargeMessageImpl);
+   }
+
    @Test
    public void testDivertToNewAddress() throws Exception {
       final String queueName = "queue";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to