This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new bfb408f590 ARTEMIS-4993 deal w/messages that become too large via
divert transformer
bfb408f590 is described below
commit bfb408f5905ad9ef5f0375d466894c276b09cca5
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Apr 30 14:17:31 2025 -0500
ARTEMIS-4993 deal w/messages that become too large via divert transformer
---
.../artemis/core/server/impl/DivertImpl.java | 6 +-
.../tests/integration/divert/DivertTest.java | 77 ++++++++++++++++++++--
2 files changed, 74 insertions(+), 9 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 8890e400e4..526971d54b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -16,11 +16,14 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import java.lang.invoke.MethodHandles;
+
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
@@ -28,7 +31,6 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
/**
* A DivertImpl simply diverts a message to a different forwardAddress
@@ -142,7 +144,7 @@ public class DivertImpl implements Divert {
}
if (transformer != null) {
- copy = transformer.transform(copy);
+ copy =
LargeServerMessageImpl.checkLargeMessage(transformer.transform(copy),
this.storageManager);
}
// We call reencode at the end only, in a single call.
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index f5bfc9082a..c6d6199b79 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -16,12 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.divert;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.fail;
-
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
@@ -33,6 +27,8 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -49,29 +45,41 @@ import
org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
+import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
+import
org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
-import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
public class DivertTest extends ActiveMQTestBase {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -1779,6 +1787,61 @@ public class DivertTest extends ActiveMQTestBase {
assertEquals("testAddress" + (COUNT - 1),
message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
}
+ @Test
+ public void testTransformedToLarge() throws Exception {
+ final String address = "address";
+ final SimpleString queue = SimpleString.of("queue");
+ final String forwardingAddress = "forwardingAddress";
+ final SimpleString forwardingQueue = SimpleString.of("forwardingQueue");
+ final int journalBufferSize = 1024;
+ final int headerCount = 5;
+
+ ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setJournalBufferSize_NIO(journalBufferSize).setJournalType(JournalType.NIO),
true));
+ server.start();
+
+ // configure the transformer to add headers to increase the size of the
message past the journal buffer size
+ Map<String, String> headers = new HashMap<>();
+ for (int i = 0; i < headerCount; i++) {
+ headers.put(RandomUtil.randomAlphaNumericString(32),
RandomUtil.randomAlphaNumericString(32));
+ }
+ TransformerConfiguration transformerConfiguration = new
TransformerConfiguration();
+
transformerConfiguration.setClassName(AddHeadersTransformer.class.getName());
+ transformerConfiguration.setProperties(headers);
+
+
server.createQueue(QueueConfiguration.of(queue).setAddress(address).setRoutingType(RoutingType.ANYCAST));
+
server.createQueue(QueueConfiguration.of(forwardingQueue).setAddress(forwardingAddress).setRoutingType(RoutingType.ANYCAST));
+ server.deployDivert(new DivertConfiguration()
+ .setName("divert")
+ .setAddress(address)
+ .setForwardingAddress(forwardingAddress)
+
.setTransformerConfiguration(transformerConfiguration));
+
+ ServerLocator locator = createInVMNonHALocator();
+ locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(false, true, true);
+ session.start();
+
+ ClientProducer producer =
session.createProducer(SimpleString.of(address));
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientConsumer divertedConsumer =
session.createConsumer(forwardingQueue);
+ ClientMessage message = session.createMessage(true);
+
message.getBodyBuffer().writeBytes(RandomUtil.randomBytes(journalBufferSize /
4));
+ producer.send(message);
+
+ // ensure the non-diverted message is not turned into a large message
+ message = consumer.receive(DivertTest.TIMEOUT);
+ assertNotNull(message);
+ message.acknowledge();
+ assertFalse(message instanceof ClientLargeMessageImpl);
+
+ // ensure the diverted message is turned into a large message
+ message = divertedConsumer.receive(DivertTest.TIMEOUT);
+ assertNotNull(message);
+ message.acknowledge();
+ assertTrue(message instanceof ClientLargeMessageImpl);
+ }
+
@Test
public void testDivertToNewAddress() throws Exception {
final String queueName = "queue";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact